Asynchronous Programming

Learning Goals

  • Software design approaches and patterns, to identify reusable solutions to commonly occurring problems
  • Apply an appropriate software development approach according to the relevant paradigm (for example object oriented, event driven or procedural)

This topic looks at concurrent function calls in Python.

Async Features in Python

Python is a single-threaded language due to the Global Interpreter Lock (GIL). Since GIL allows only one thread to execute at a time, a multi-processing library is required to achieve true parallelism.

  • Parallelism consists of performing multiple operations at the same time. Multiprocessing means spreading the workload over several CPUs or cores. It is well suited to CPU-bound scenarios (i.e. when you genuinely want to run code in parallel).

  • Concurrency is the process of multiple tasks being run in an overlapping manner. Concurrency does not imply parallelism.

  • Threading is a form of concurrency where multiple threads take turns executing different tasks. Due to Python being a single-threaded language, it has a complicated relationship with threading.

There are threading and muliprocessing libraries in Python. This reading will focus on the asyncio library which is best for IO-bound scenarios (e.g. making external calls via HTTP or to a DB) rather than CPU-bound scenarios. asyncio event loops are single-threaded but requires the use of async-compatible libraries to truly benefit from the framework. For an example of this see the HTTPX library which has async support or the motor library for working with MongoDB databases.

Coroutines

What are coroutines functions you ask? They are functions that schedule the execution of the events i.e. not immediately after they have been invoked. The Python package asyncio is used to write concurrent code and will be used in all the examples. Async io introduces two keywords: async and await which help define and run coroutines. The asyncio.run() function starts the event loop and runs the coroutine and asyncio.gather() runs multiple coroutines concurrently.

import asyncio

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count())

asyncio.run(main())

# prints: One One Two Two

As you have seen in the example above, both Two statements are printed after both One statements due to the print('Two') being executed after asyncio.sleep. Take note that if you were to define without the async key word and using time.sleep() it would print out in a different order:

import time

def count():
    print("One")
    time.sleep(1)
    print("Two")

def main():
    for i in range(2):
        count()

# prints: One Two One Two

This synchronous version executes in order and takes longer to run. This is due to time.sleep() being a blocking function whilst asyncio.sleep() is an asynchronous non-blocking call.

async/ await

The syntax async def creates a coroutine. The expressions async with and async for are also valid:

# create and use the thread pool
async with ThreadPool() as pool:
    # use the thread pool...
# closed automatically
# traverse an asynchronous iterator
async for item in async_iterator:
    print(item)

An object is called awaitable if it can be used with the await keyword. There are three main types of awaitable objects: coroutines, tasks, and futures. They are explained in more detail below. The keyword await passes function control back to the event loop. For example:

async def a():
    c = await b()
    return c

The await means that the function pauses above it and comes back to a() when b() is ready. In the meantime it lets something else run.

You can only use await in the body of coroutines. For example, the following will throw a syntax error.

def m(x):
    y = await z(x) 
    return y

This is because the await is outside of the async def coroutine. This could be fixed by changing def to async def.

A very common use-case of asynchronous functions and callbacks is HTTP requests, e.g. when fetching some data from another API:

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.read()
            print(f"Response received:{data}")

asyncio.run(main())

The response is only executed when the HTTP request has successfully made its way across the network and back.

This is a very common sight, as a lot of tasks on the web take time and we don’t want to stop our entire program to wait for the response to come back.

Error handling is very important for awaited coroutines and tasks. We can catch errors using a try/except block.

try:
    # wait for the task to finish
    await task
except Exception as e:
    # ...

Running concurrent tasks

Imagine you have a set of separate tasks and each one takes a long time to finish. Their outputs aren’t dependent on each other so it would be convenient to run them all at once. If these tasks are executed synchronously the program will have to wait for each task to finish before starting the next one. To speed up this completion of tasks you can run concurrent tasks!

import asyncio

async def fast_task(task):
    print(f"before {task}")
    await asyncio.sleep(1)
    print(f"after {task}")

async def slow_task(task):
    print(f"before {task}")
    await asyncio.sleep(2)
    print(f"after {task}")

async def main():
    await fast_task('fast task')
    await slow_task('slow task')

asyncio.run(main())
# prints:
# before fast task
# after fast task
# before slow task
# after slow task

This example waits for the fast_task() coroutine to finish so it executes in 1 second, and then executes the slow_task() coroutine after waiting for 2 seconds. To make the coroutines run concurrently, we can create tasks using the asyncio.create_task() function.

import asyncio

async def fast_task(task):
    print(f"before {task}")
    await asyncio.sleep(1)
    print(f"after {task}")

async def slow_task(task):
    print(f"before {task}")
    await asyncio.sleep(2)
    print(f"after {task}")

async def main():
    task1 = asyncio.create_task(fast_task(1, 'fast task'))
    task2 = asyncio.create_task(slow_task(2, 'slow task'))
    await task1
    await task2

asyncio.run(main())
# prints:
# before fast task
# before slow task
# after fast task
# after slow task

The above code shows that slow task is no longer waiting for fast task to finish before running but running concurrently. The asyncio.create_task() wraps the fast_task and slow_task function and makes it run the coroutines concurrently as an asynchronous task. This means that the tasks are executed much faster than before.

Another way to make multiple coroutines run concurrently is to use asyncio.gather(). This function was used in the first example and it takes coroutines as arguments and runs them concurrently.

Chaining coroutines

Independent tasks can be run concurrently but what if we want to pass data from one coroutine to another? Coroutines can be chained together which allows you to break programs up. In the example below, task_1 takes in a number and adds 2, it waits a random amount of time before returning. task_2 takes in a number returned from task_1 and multiplies it by 2 and also waits a random amount of time before returning. An array of numbers are iterated through both tasks. Try running the following example.

import asyncio
import random

async def task_1(n: int):
    # adds 2
    i = random.randint(0, 5)
    print(f"Task 1 with {n} is sleeping for {i} seconds.")
    await asyncio.sleep(2)
    result = n + 2
    print(f"Returning task 1 result, which is {result}")
    return result

async def task_2(n: int):
    # multipies by 2
    i = random.randint(0, 5)
    print(f"Task 2 with {n} is sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = n * 2
    print(f"Returning task 2 result, which is {result}")
    return result

async def chain(n: int):
    p1 = await task_1(n)
    p2 = await task_2(p1)
    print(f"Chained result {n} => {p2}.")

async def main():
    await asyncio.gather(*(chain(n) for n in [1,2,3]))

asyncio.run(main())
 

Pay careful attention to the output, when task_1() sleeps for a variable amount of time, task_2() begins working with the results as they become available.

Queue

Asyncio queues are designed to be used specifically in async/await code. Queues can be used to distribute workload between several concurrent tasks. Methods of asyncio queues don’t have a timeout parameter and so the asyncio.wait_for() function should be used to complete a queue operation timeout.

In the example above, a queue structure isn’t needed as each set of coroutines explicitly awaits each other. However, if there are a group of tasks that get added and there are a group of workers who complete the tasks from the queue at random times then a queue is necessary. In this example, there is no chaining of any particular task to a worker. The number of tasks is not known by the workers.

The synchronous version of this program would be very inefficient: the tasks would be added to the queue one at a time and then after all of the tasks are added, only then can the workers start one by one to complete them. Tasks may sit idly in the queue rather than be picked up and processed immediately.

Below is the following example of using a queue from the asyncio documentation, try running it:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

When this example is run it can be seen how the workers pick up the tasks from the queue and work on them concurrently. Once the task is done they pick up a new one and this continues until all the tasks are completed in the queue.

Futures

Future objects are awaitable however, unlike coroutines, when a future is awaited it does not block the code being executed and it can be awaited multiple times. It represents a process ongoing somewhere else that may or may not have finished. Future objects have a boolean attribute called done which when true will either return a result or an exception. Future objects f have the following properties:

  • f.done() returns True if the process has finished and False if not
  • f.exception() raises an asyncio.InvalidStateError exception if the process has not finished. If the process has finished it returns the exception it raised, or None if it terminated without raising an exception
  • f.result() raises an asyncio.InvalidStateError exception if the process has not finished. If the process has finished it returns the value the process returned, or the exception the process raised if there was one.

You can create your own Future by calling:

f = asyncio.get_running_loop().create_future()

This example shows the creation of a future object, and the creation and scheduled asynchronous task to set the result for the future, and waits until the future has a result:

async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)

async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    loop.create_task(set_after(fut, 1, '... world'))

    print('hello ...')

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)

asyncio.run(main())

It’s important to note that a future that is done can’t change back into one that is not yet done. A future becoming done is a one-time occurrence. You probably won’t create your own futures very often unless you are implementing new libraries that extend asyncio.