Asynchronous Programming
- Software design approaches and patterns, to identify reusable solutions to commonly occurring problems
- Apply an appropriate software development approach according to the relevant paradigm (for example object oriented, event driven or procedural)
This topic looks at concurrent function calls in Python.
Async Features in Python
Python is a single-threaded language due to the Global Interpreter Lock (GIL). Since GIL allows only one thread to execute at a time, a multi-processing library is required to achieve true parallelism.
-
Parallelism consists of performing multiple operations at the same time. Multiprocessing means spreading the workload over several CPUs or cores. It is well suited to CPU-bound scenarios (i.e. when you genuinely want to run code in parallel).
-
Concurrency is the process of multiple tasks being run in an overlapping manner. Concurrency does not imply parallelism.
-
Threading is a form of concurrency where multiple threads take turns executing different tasks. Due to Python being a single-threaded language, it has a complicated relationship with threading.
There are threading and muliprocessing libraries in Python. This reading will focus on the asyncio library which is best for IO-bound scenarios (e.g. making external calls via HTTP or to a DB) rather than CPU-bound scenarios. asyncio event loops are single-threaded but requires the use of async-compatible libraries to truly benefit from the framework. For an example of this see the HTTPX library which has async support or the motor library for working with MongoDB databases.
Coroutines
What are coroutines functions you ask? They are functions that schedule the execution of the events i.e. not immediately after they have been invoked. The Python package asyncio is used to write concurrent code and will be used in all the examples. Async io introduces two keywords: async
and await
which help define and run coroutines. The asyncio.run()
function starts the event loop and runs the coroutine and asyncio.gather()
runs multiple coroutines concurrently.
import asyncio
async def count():
print("One")
await asyncio.sleep(1)
print("Two")
async def main():
await asyncio.gather(count(), count())
asyncio.run(main())
# prints: One One Two Two
As you have seen in the example above, both Two
statements are printed after both One
statements due to the print('Two')
being executed after asyncio.sleep
. Take note that if you were to define without the async
key word and using time.sleep()
it would print out in a different order:
import time
def count():
print("One")
time.sleep(1)
print("Two")
def main():
for i in range(2):
count()
# prints: One Two One Two
This synchronous version executes in order and takes longer to run. This is due to time.sleep()
being a blocking function whilst asyncio.sleep()
is an asynchronous non-blocking call.
async
/ await
The syntax async def
creates a coroutine. The expressions async with
and async for
are also valid:
# create and use the thread pool
async with ThreadPool() as pool:
# use the thread pool...
# closed automatically
# traverse an asynchronous iterator
async for item in async_iterator:
print(item)
An object is called awaitable if it can be used with the await
keyword. There are three main types of awaitable objects: coroutines, tasks, and futures. They are explained in more detail below. The keyword await
passes function control back to the event loop. For example:
async def a():
c = await b()
return c
The await
means that the function pauses above it and comes back to a()
when b()
is ready. In the meantime it lets something else run.
You can only use await
in the body of coroutines. For example, the following will throw a syntax error.
def m(x):
y = await z(x)
return y
This is because the await
is outside of the async def
coroutine. This could be fixed by changing def
to async def
.
A very common use-case of asynchronous functions and callbacks is HTTP requests, e.g. when fetching some data from another API:
async def main():
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.read()
print(f"Response received:{data}")
asyncio.run(main())
The response is only executed when the HTTP request has successfully made its way across the network and back.
This is a very common sight, as a lot of tasks on the web take time and we don’t want to stop our entire program to wait for the response to come back.
Error handling is very important for awaited coroutines and tasks. We can catch errors using a try
/except
block.
try:
# wait for the task to finish
await task
except Exception as e:
# ...
Running concurrent tasks
Imagine you have a set of separate tasks and each one takes a long time to finish. Their outputs aren’t dependent on each other so it would be convenient to run them all at once. If these tasks are executed synchronously the program will have to wait for each task to finish before starting the next one. To speed up this completion of tasks you can run concurrent tasks!
import asyncio
async def fast_task(task):
print(f"before {task}")
await asyncio.sleep(1)
print(f"after {task}")
async def slow_task(task):
print(f"before {task}")
await asyncio.sleep(2)
print(f"after {task}")
async def main():
await fast_task('fast task')
await slow_task('slow task')
asyncio.run(main())
# prints:
# before fast task
# after fast task
# before slow task
# after slow task
This example waits for the fast_task()
coroutine to finish so it executes in 1 second, and then executes the slow_task()
coroutine after waiting for 2 seconds. To make the coroutines run concurrently, we can create tasks using the asyncio.create_task()
function.
import asyncio
async def fast_task(task):
print(f"before {task}")
await asyncio.sleep(1)
print(f"after {task}")
async def slow_task(task):
print(f"before {task}")
await asyncio.sleep(2)
print(f"after {task}")
async def main():
task1 = asyncio.create_task(fast_task(1, 'fast task'))
task2 = asyncio.create_task(slow_task(2, 'slow task'))
await task1
await task2
asyncio.run(main())
# prints:
# before fast task
# before slow task
# after fast task
# after slow task
The above code shows that slow task is no longer waiting for fast task to finish before running but running concurrently. The asyncio.create_task()
wraps the fast_task
and slow_task
function and makes it run the coroutines concurrently as an asynchronous task. This means that the tasks are executed much faster than before.
Another way to make multiple coroutines run concurrently is to use asyncio.gather()
. This function was used in the first example and it takes coroutines as arguments and runs them concurrently.
Chaining coroutines
Independent tasks can be run concurrently but what if we want to pass data from one coroutine to another? Coroutines can be chained together which allows you to break programs up. In the example below, task_1
takes in a number and adds 2, it waits a random amount of time before returning. task_2
takes in a number returned from task_1
and multiplies it by 2 and also waits a random amount of time before returning. An array of numbers are iterated through both tasks. Try running the following example.
import asyncio
import random
async def task_1(n: int):
# adds 2
i = random.randint(0, 5)
print(f"Task 1 with {n} is sleeping for {i} seconds.")
await asyncio.sleep(2)
result = n + 2
print(f"Returning task 1 result, which is {result}")
return result
async def task_2(n: int):
# multipies by 2
i = random.randint(0, 5)
print(f"Task 2 with {n} is sleeping for {i} seconds.")
await asyncio.sleep(i)
result = n * 2
print(f"Returning task 2 result, which is {result}")
return result
async def chain(n: int):
p1 = await task_1(n)
p2 = await task_2(p1)
print(f"Chained result {n} => {p2}.")
async def main():
await asyncio.gather(*(chain(n) for n in [1,2,3]))
asyncio.run(main())
Pay careful attention to the output, when task_1()
sleeps for a variable amount of time, task_2()
begins working with the results as they become available.
Queue
Asyncio queues are designed to be used specifically in async/await code. Queues can be used to distribute workload between several concurrent tasks. Methods of asyncio queues don’t have a timeout parameter and so the asyncio.wait_for()
function should be used to complete a queue operation timeout.
In the example above, a queue structure isn’t needed as each set of coroutines explicitly awaits each other. However, if there are a group of tasks that get added and there are a group of workers who complete the tasks from the queue at random times then a queue is necessary. In this example, there is no chaining of any particular task to a worker. The number of tasks is not known by the workers.
The synchronous version of this program would be very inefficient: the tasks would be added to the queue one at a time and then after all of the tasks are added, only then can the workers start one by one to complete them. Tasks may sit idly in the queue rather than be picked up and processed immediately.
Below is the following example of using a queue from the asyncio documentation, try running it:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
When this example is run it can be seen how the workers pick up the tasks from the queue and work on them concurrently. Once the task is done they pick up a new one and this continues until all the tasks are completed in the queue.
Futures
Future objects are awaitable however, unlike coroutines, when a future is awaited it does not block the code being executed and it can be awaited multiple times. It represents a process ongoing somewhere else that may or may not have finished. Future objects have a boolean attribute called done
which when true will either return a result or an exception. Future objects f
have the following properties:
f.done()
returnsTrue
if the process has finished andFalse
if notf.exception()
raises anasyncio.InvalidStateError
exception if the process has not finished. If the process has finished it returns the exception it raised, orNone
if it terminated without raising an exceptionf.result()
raises anasyncio.InvalidStateError
exception if the process has not finished. If the process has finished it returns the value the process returned, or the exception the process raised if there was one.
You can create your own Future by calling:
f = asyncio.get_running_loop().create_future()
This example shows the creation of a future object, and the creation and scheduled asynchronous task to set the result for the future, and waits until the future has a result:
async def set_after(fut, delay, value):
# Sleep for *delay* seconds.
await asyncio.sleep(delay)
# Set *value* as a result of *fut* Future.
fut.set_result(value)
async def main():
# Get the current event loop.
loop = asyncio.get_running_loop()
# Create a new Future object.
fut = loop.create_future()
# Run "set_after()" coroutine in a parallel Task.
# We are using the low-level "loop.create_task()" API here because
# we already have a reference to the event loop at hand.
# Otherwise we could have just used "asyncio.create_task()".
loop.create_task(set_after(fut, 1, '... world'))
print('hello ...')
# Wait until *fut* has a result (1 second) and print it.
print(await fut)
asyncio.run(main())
It’s important to note that a future that is done can’t change back into one that is not yet done. A future becoming done is a one-time occurrence. You probably won’t create your own futures very often unless you are implementing new libraries that extend asyncio.