How to Use Threads for IO Tasks in Python

Using different methods such as thread pool executor or threading module to create and handle threads for speeding up I/O bound tasks in Python.
  · 9 min read · Updated jul 2020 · Python Standard Library


In computing, a thread is a sequence of programmed instructions to be executed in a program, two threads running in a single program means that they are running concurrently (i.e and not in parallel). Unlike processes, threads in Python doesn't run on a separate CPU core, they share memory space and efficiently read and write to the same variables.

Threads are like mini-processes, in fact, some people call them as lightweight processes, that's because threads can live inside a process and does the job of a process. But in fact, they are quite different, here is the main differences between threads and processes in Python:

Process

  • A process can contain one or more threads
  • Processes have separate memory spaces
  • Two processes can run on different CPU cores (which leads to communication problems, but better CPU performance)
  • Processes have more overhead than threads (creating and destroying processes takes more time)
  • Running multiple processes is only effective for CPU-bound tasks

Thread

  • Threads share the same memory space and can read and write to shared variables (with synchronization, of course)
  • Two threads in a single Python program cannot execute in the same time
  • Running multiple threads is only effective for I/O-bound tasks

Now you're maybe wondering, why do we need to use threads if they can't run simultaneously ? Before we answer that question, let's see why threads can't run in the same time.

Python's GIL

The most controversial topic among Python developers, the GIL stands for Global Interpreter Lock, which is basically a lock that prevents two threads from executing simultaneously in the same Python interpreter, some people doesn't like that, while the others claiming that it isn't a problem, as there are libraries such as Numpy that bypass this limitation by running programs in external C code.

Now why do we need to use threads then ? Well, Python releases the lock while waiting for I/O block to resolve, so if your Python code makes a request to an API, a database in disk, or downloading from the internet, Python doesn't give it a chance to even acquire the lock, as these kind of operations happen outside the GIL. In a netshell, we only benefit from threads in the I/O bound.

Single Thread

For demonstration, the below code tries to download some files from the internet (which is a perfect I/O task) sequentially without using threads (it requires requests to be installed, just run pip3 install requests):

import requests
from time import perf_counter

# read 1024 bytes every time 
buffer_size = 1024

def download(url):
    # download the body of response by chunk, not immediately
    response = requests.get(url, stream=True)
    # get the file name
    filename = url.split("/")[-1]
    with open(filename, "wb") as f:
        for data in response.iter_content(buffer_size):
            # write data read to the file
            f.write(data)

if __name__ == "__main__":
    urls = [
        "https://cdn.pixabay.com/photo/2018/01/14/23/12/nature-3082832__340.jpg",
        "https://cdn.pixabay.com/photo/2013/10/02/23/03/dawn-190055__340.jpg",
        "https://cdn.pixabay.com/photo/2016/10/21/14/50/plouzane-1758197__340.jpg",
        "https://cdn.pixabay.com/photo/2016/11/29/05/45/astronomy-1867616__340.jpg",
        "https://cdn.pixabay.com/photo/2014/07/28/20/39/landscape-404072__340.jpg",
    ] * 5

    t = perf_counter()
    for url in urls:
        download(url)
    print(f"Time took: {perf_counter() - t:.2f}s")

Once you execute it, you'll notice new images appear in the current directory and you'll get something like this as output:

Time took: 13.76s

So the above code is pretty straightforward, it iterates over these images and download them each one by one, that took about 13.8s (will vary depending on your Internet connection) but in anyway, we're wasting a lot of time here, if you need performance, consider using threads.

Multiple Threads

import requests
from concurrent.futures import ThreadPoolExecutor
from time import perf_counter

# number of threads to spawn
n_threads = 5
# read 1024 bytes every time 
buffer_size = 1024

def download(url):
    # download the body of response by chunk, not immediately
    response = requests.get(url, stream=True)
    # get the file name
    filename = url.split("/")[-1]
    with open(filename, "wb") as f:
        for data in response.iter_content(buffer_size):
            # write data read to the file
            f.write(data)

if __name__ == "__main__":
    urls = [
        "https://cdn.pixabay.com/photo/2018/01/14/23/12/nature-3082832__340.jpg",
        "https://cdn.pixabay.com/photo/2013/10/02/23/03/dawn-190055__340.jpg",
        "https://cdn.pixabay.com/photo/2016/10/21/14/50/plouzane-1758197__340.jpg",
        "https://cdn.pixabay.com/photo/2016/11/29/05/45/astronomy-1867616__340.jpg",
        "https://cdn.pixabay.com/photo/2014/07/28/20/39/landscape-404072__340.jpg",
    ] * 5
    t = perf_counter()
    with ThreadPoolExecutor(max_workers=n_threads) as pool:
        pool.map(download, urls)
    print(f"Time took: {perf_counter() - t:.2f}s")

The code here is changed a little bit, we are now using ThreadPoolExecutor class from the concurrent.futures package, it basically creates a pool with a number of threads that we specify, and then it handles splitting the urls list across the threads using pool.map() method.

Here is how long it lasted for me:

Time took: 3.85s

That is about x3.6 faster (atleast for me) using 5 threads, try to tune the number of threads to spawn on your machine and see if you can further optimize it.

Now that's not the only way you can create threads, you can use the threading module with a queue as well, here is another equivalent code:

import requests
from threading import Thread
from queue import Queue

# thread-safe queue initialization
q = Queue()
# number of threads to spawn
n_threads = 5
# read 1024 bytes every time 
buffer_size = 1024

def download():
    global q
    while True:
        # get the url from the queue
        url = q.get()
        # download the body of response by chunk, not immediately
        response = requests.get(url, stream=True)
        # get the file name
        filename = url.split("/")[-1]
        with open(filename, "wb") as f:
            for data in response.iter_content(buffer_size):
                # write data read to the file
                f.write(data)
        # we're done downloading the file
        q.task_done()

if __name__ == "__main__":
    urls = [
        "https://cdn.pixabay.com/photo/2018/01/14/23/12/nature-3082832__340.jpg",
        "https://cdn.pixabay.com/photo/2013/10/02/23/03/dawn-190055__340.jpg",
        "https://cdn.pixabay.com/photo/2016/10/21/14/50/plouzane-1758197__340.jpg",
        "https://cdn.pixabay.com/photo/2016/11/29/05/45/astronomy-1867616__340.jpg",
        "https://cdn.pixabay.com/photo/2014/07/28/20/39/landscape-404072__340.jpg",
    ] * 5
    # fill the queue with all the urls
    for url in urls:
        q.put(url)
    # start the threads
    for t in range(n_threads):
        worker = Thread(target=download)
        # daemon thread means a thread that will end when the main thread ends
        worker.daemon = True
        worker.start()
    # wait until the queue is empty
    q.join()

This is a good alternative too, we are using a synchronized queue here in which we fill it with all the image urls we want to download and then spawn the threads manually that each execute download() function.

As you may already seen, the download() function uses an infinite loop, which will never end, I know this is counter-intuitive, but it makes sense when we know that the thread that is executing this function is a daemon thread, which means that it will end as long as the main thread ends.

So we're using q.put() method to put the item, and q.get() to get that item and consume it (in this case, download it), this is the Producer-consumer problem that is widely discussed in Computer Science field.

Now what if two threads execute the q.get() method (or q.put() as well) in the same time, what happens ? Well, I said before this queue is thread-safe (synchronized), which means it uses a lock under the hood that prevents two threads to get the item simultaneously.

When we finish downloading that file, we call the q.task_done() method which tells the queue that the processing on the task (on that item) is complete.

Returning to the main thread, we created the threads and started them using start() method, after that, we need a way to block the main thread until all threads are completed, that is what q.join() exactly does, it blocks until all items in the queue have been gotten and processed.

Conclusion

To conclude, first, you shouldn't use threads if you don't need to speed up your code, maybe you execute it once in a month, so you're just gonna add code complexity which can result difficulties in debugging phase.

Second, If your code is heavily CPU task, you shouldn't use threads as well, that's because of the GIL. If you wish to run your code on multi cores, then you should use multiprocessing module, which provides a similar functionality but with processes instead of threads.

Third, you should only use threads on I/O tasks such as writing to a disk, waiting for a network resource, etc.

Finally, you should use ThreadPoolExecutor when you have the items to process before even starting to consume them. However, if your items are not pre-defined and are grabbed while the code is executing (as it is generally the case in Web Scraping), then consider using the synchronized queue with threading module.

Here is some tutorials in which I used threads:

Resources & Courses

Finally, many of the Python concepts aren't discussed in detail here, if you feel you want to dig more to Python, I highly suggest you get one of these amazing courses:

 

 

Happy Coding ♥

View Full Code
Sharing is caring!



Read Also





Comment panel