asyncio: a library with too many sharp corners

Red garden tulip (Tulipa gesneriana)
A red garden tulip. Photo by my friend Jamie.

One of the headliner features of Python 3.4 (released in 2014) was a new library in the standard library: asyncio, provisionally introduced for feedback as an import of the external tulip library. In Python 3.5 (released in 2015), async and await were added as keywords to the language specifically for usage with asynchronous libraries, replacing the usage of yield from. The asyncio module was also made non-provisional in this release, heralding an entire new ecosystem of asynchronous libraries in the Python world.

But asyncio has so many sharp corners and design issues it is far too difficult to use, bordering on being fundamentally broken. Some of these were only realised in hindsight because other languages (Kotlin, Swift) and libraries did what asyncio does significantly better; but most of these issues were bad at release and it is baffling how the library made it out of provisional status with such glaring flaws.

I mention the Trio library a lot in this post, but there's also the AnyIO library that implements Trio-like semantics on top of asyncio, fixing most of the issues described here whilst retaining a level of compatibility with regular asyncio libraries.

Contents

  1. Major Problem #1: Cancellation is broken
  2. Major Problem #2: Task was destroyed but it is pending!
  3. Major Problem #3: I/O has pointless landmines
  4. Major Problem #4: asyncio.Queue is difficult to use
  5. Other, less major problems

Major Problem #1: Cancellation is broken

In the traditional model of concurrent programming using threads, there is no clean way to do cancellation. In the standard pthreads model, the only way to do cancellation is to brutally murder a thread using pthread_kill, which is nearly always a bad idea because anything the thread was using (such as locks) will be in an unknown and inconsistent state if the thread was killed in the middle of an operation. If you want a better mechanism, you need to implement it yourself by constantly polling a shared state object in-between doing work, using a loop like so:

cancelled = threading.Event()

def t1():
    while not cancelled.is_set():
        do_work()

def main():
    threading.Thread(target=t1).start()
    # ... do something inbetween
    cancelled.set()

This is unergonomic and error-prone as only threads that opt-in to this cancellation mechanism can be cancelled and only when they explicitly check if they are cancelled. Some languages (i.e. Java) make this a bit better by having a Thread.interrupt() method that handles dealing with communicating the interrupted state, with most standard library functions such as Object.wait() automatically checking for the interrupted state. (This still falls victim to the other issues described here.)

asyncio is an asynchronous runtime and is responsible for its own scheduling of its own tasks, instead of the kernel. When an asyncio task needs to interact with the system, it asks the event loop to suspend it until an operation is complete whereupon the task will be rescheduled and will run again in the next tick of the event loop. Threads do the same, but with system calls, and the user application has no control over the kernel's scheduler beyond tweaking some tuning parameters.

This scheduling mechanism is reused to implement cancellation. When a task is cancelled, any pending operation that the event loop was performing for a task is cancelled, and instead the call raises a CancelledError. Unlike threads tasks no longer need to check if they have been cancelled; every single time a call drops into the event loop the runtime itself checks for cancellation. Conceptually, you can imagine every await as a cancellation point:

async def something(stream: SomeLibraryStream):
    while True:
        result = await stream.read()  # Cancellation point
        parsed = do_parse(result)     # *not* a cancellation point

From this, we can derive a conceptual model of how tasks and cancellations interact:

  1. Tasks run until an await, at which point they suspend.
  2. Something else calls task.cancel(), which reschedules the task again.
  3. The function that was being await-ed now raises Cancelled.
  4. This exception propagates backwards, unwinding through all functions in the call stack and cleaning up as it goes.

This avoids both problems with threads: tasks can be externally killed without worrying about resources not being torn down, and end-user tasks don't need to constantly check if they've been cancelled because the event loop does it for you.

But that's not how it works

Consider this function below that returns a resource wrapped in an asynchronous context manager. When the user is done, it needs to clean up some resources (say, a server needs a clean close). This cleanup should be done regardless of if the code running inside the context manager was successful or not, so it's ran inside a finally block:

@asynccontextmanager
async def connect_to_server(ip: str, *, port: int = 6767) -> AsyncIterator[Sock]:
    sock = await connect_socket(ip, port)

    async with sock:
        await sock.send(b"IDENTIFY ident :bar\r\nNICKNAME :gquuuuuux)\r\n")
        try:
            yield sock
        finally:
            await sock.send(b"QUIT :died to some small fry")

In this case, let's say .send() waits for some form of acknowledgement message. There's also another task that is spawned somewhere, and it's sending a PING message to the server every few seconds and expecting a PONG message. If the client goes too long without receiving a PONG, it cancels the task inside the context manager and exits itself.

What happens if the server does stop responding, and the task is cancelled? Let's see:

  1. First, any code in the user function running inside the asynchronous context manager is cancelled with a CancelledException bubbling upwards.

  2. Next, the yield sock expression raises a CancelledException, and control flows into the finally block.

  3. The code enters the sock.send() function, which re-enters the event loop. The event loop completely forgets that the task was cancelled and is entirely happy to deadlock the application forever waiting for the server to respond to the .send() (which will never happen).

This is because cancellations in asyncio are edge-triggered, not level-triggered. These concepts are mostly used in the world of electronics, but are also applicable to certain types of programming too; an edge-triggered event only fires once when the state changes. In this case, it's Task.cancel() firing a cancellation error exactly once. This is the opposite behaviour to level-triggered cancellations, where cancelling a task will cause all calls to the event loop to raise a CancelledException, forever.

Here's a more practical example that you can run directly on your computer to see this behaviour.

import asyncio

event = asyncio.Event()

async def fn():
    try:
        event.set()
        await asyncio.sleep(60)
    finally:
        await asyncio.sleep(5)
        print("slept for 5s")

async def main():
    task = asyncio.create_task(fn())
    await event.wait()
    task.cancel()
    await asyncio.sleep(10)

asyncio.run(main())

When you run this, the first sleep(60) will be cancelled, and then the program will sleep for five more seconds before printing a slept for 5s message because the cancellation disappeared.

This is absolutely 100% the wrong behaviour and it makes cancellations dangerous when it can be swallowed or covered up at any point.

  • Using a bare except:? Swallows cancellations. People will lie and say that they don't write these, but people do use bare excepts. Even if you don't, do you know that every other library doesn't?

  • Doing cleanup in __aexit__? Can deadlock waiting for something that will never happen, swallowing the cancellation.

  • Doing cleanup in try/finally? See above.

It could be better

Graceful asynchronous cleanup is intrinsically a difficult problem; if an operation blocks for too long, what do you do? If you adopt a rigid rule of always trying to be graceful you risk running into deadlocks if the operation never returns. If you simply avoid doing anything gracefully and just sever connections and open files with a machete you can end up with half-written data or some very unhappy servers on the other end. It doesn't really matter in the asyncio world, because the library doesn't give you any tools to implement this.

The Trio library takes the opposite approach; all cancellations are level-triggered. Let's port the sleeping example above to use Trio instead:

import trio

event = trio.Event()

async def task():
    try:
        event.set()
        await trio.sleep(60)
    finally:
        await trio.sleep(5)
        print("slept for 5s")

async def main():
    async with trio.open_nursery() as n:
        n.start_soon(task)
        await event.wait()
        n.cancel_scope.cancel()
        await trio.sleep(10)  # Not needed, but for parity with the previous example.

trio.run(main)

Running this will produce... no output. It won't wait either, because anything that could wait has been cancelled. If you add a print() between the event.wait and the cancel_scope.cancel(), that will print something too, so it's not exiting early because it's not running anything.

This then asks a question: How do you do graceful cleanup? With shielded cancel scopes and timeouts. I'll replace the finally block above with one of those:

    finally:
        with trio.move_on_after(1, shield=True):
            await trio.sleep(5)
        print("slept for 1s?")
        await trio.sleep(5)
        print("slept for 5s?")

Running this will print slept for 1s?, but nothing more. The code running inside the context manager ignored the outside cancellation, but was re-cancelled after a second anyway. This once again nets you the best of both worlds: cancellations aren't swallowed unless you explicitly opt-in. Remember the Zen of Python: Explicit is better than implicit.

Major Problem #2: Task was destroyed but it is pending!

If you've ever used an asyncio application, you've probably seen that message pop up before. As an example, if I Ctrl-C portage too quickly, it spits out a few of those errors. Why? Because asyncio does not keep strong references to tasks. Quoting the official documentation:

Important

Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done. For reliable “fire-and-forget” background tasks, gather them in a collection:

Let's take some example code:

import asyncio, gc


async def expose_bugs():
    while True:
        await asyncio.sleep(0.5)
        # Simulate doing work that would have the GC fire.
        gc.collect()


async def has_bug():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()
    await fut


async def main():
    t = asyncio.create_task(expose_bugs())
    asyncio.create_task(has_bug())
    await asyncio.sleep(5)


asyncio.run(main())

If you run this, it will print a warning to stderr about how has_bug was destroyed when it was pending. has_bug has no strong references to it, so when the GC runs the weak reference the event loop holds is removed and the task is dropped on the floor. Goodbye, has_bug.

This is very obviously insane behaviour, but it can somewhat be avoided by always holding references to spawned tasks (similarly to how you can avoid segmentation faults by always doing bounds checking). But it gets worse. There's a set of helper functions that are used for corralling tasks around: wait_for, gather, or shield; these can all cause a function being waited on to be dropped on the floor because they internally spawn said function as a task and wait on that instead:

import asyncio, gc


async def expose_bugs():
    while True:
        await asyncio.sleep(0.5)
        # Simulate doing work that would have the GC fire.
        gc.collect()


async def has_bug():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()
    await fut


async def shield_task():
    await asyncio.shield(has_bug())


async def main():
    t1 = asyncio.create_task(expose_bugs())
    t2 = asyncio.create_task(shield_task())
    # scheduling pass
    await asyncio.sleep(1)
    t2.cancel()
    await asyncio.sleep(2)


asyncio.run(main())

When t2 is cancelled, the outer await asyncio.shield(...) call is cancelled. The cancellation doesn't propagate through into has_bug because of the shielding, and the outer task still has a strong reference in the form of t2. But has_bug's task has no strong references to it; the only reference was in the local variables of the shield() functions. The next time the event loop ticks, gc.collect() is called, which drops the has_bug task entirely.

You might try to avoid this by doing create_task explicitly as this will keep a strong reference to the has_bug() task in the local variables of the cancelled generator coroutine for shield_task, like so:

async def shield_task():
    inner = asyncio.create_task(has_bug())
    await asyncio.shield(inner)

But this only works all the while the handle to t2 lives inside main(). If that handle gets dropped, then the inner has_bug will also get dropped! Adding a del t2 after the t2.cancel() will expose this immediately. Good luck tracking this through a web of classes and tasks.

Major Problem #3: I/O has pointless landmines

The underlying API for performing network I/O is the ever-venerable BSD socket API. Python exposes a nice object-based API for working with sockets; let's look at some code that opens a connection on a socket and sends some data.

s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.connect(("2001:708:40:2001::11ba", 6667))
s.send(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
motd = s.recv(4096)
s.shutdown(socket.SHUT_RDWR)  # Graceful close, send an EOF
s.close()

This is pretty bare-bones, but it's easy to see how the code flows: top to bottom. It's a set of linear statements:

  1. Create the socket with the appropriate options.
  2. Connect it to an address directly.
  3. Send some data from a socket.
  4. Receive some data from the socket.
  5. Shut it down and close the socket.

This all happens in order; it's simple to follow. Trio offers an asynchronous version of this, so let's rewrite the code to be identical with Trio sockets:

s = trio.socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
await s.connect(("2001:708:40:2001::11ba", 6667))
await s.send(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
motd = s.recv(4096)
s.shutdown(socket.SHUT_RDWR)  # Graceful close, schedules an EOF
s.close()

The code is almost identical, with some await statements introduced before every function that would normally block. Again, the control flow is simple; it flows from top to bottom in a linear fashion. Let's look at asyncio's version of sockets, which are called protocols:

import asyncio


class IdentifyProtocol(asyncio.Protocol):
    def __init__(self, message: bytes, motd_future: asyncio.Future):
        self.message = message 
        self.motd = motd_future

    def connection_made(self, transport: asyncio.WriteTransport):
        transport.write(self.message.encode())

    def data_received(self, data: bytes):
        self.motd.set_result(data)

    def connection_lost(self, exc: BaseException):
        ...


fut = loop.create_future()
transport, protocol = await loop.create_connection(
    partial(EchoClientProtocol, b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n", fut),
    "2001:708:40:2001::11ba", 
    6667,
)

motd = await protocol.motd

Unlike regular BSD sockets or Trio's socket wrappers, asyncio uses callbacks - synchronous ones, at that - to implement the low-level I/O primitives. The control flow here jumps around a lot:

  1. create_connection is equivalent to socket.socket + socket.connect. Okay. You don't get to set socket options (at least it sets TCP_NODELAY) and it doesn't work for anything other than regular AF_INET/AF_INET6 sockets.

    It returns a tuple of (write transport, protocol instance); the former can be used to send further data (synchronously).

  2. When the socket is opened, it jumps into my class and calls (synchronously) connection_made, providing a "transport" which I can call the (synchronous) write method on to send my authentication method.

    There's no way to wait for this to be sent as WriteTransport.write is synchronous. It'll get sent at some point in the future. Maybe. If you want to let somebody know that you've sent the message, you'll need to implement that yourself too.

  3. After some time, the server will respond; the event loop calls (synchronously) data_received, providing the received data. If you want to do something with this data (asynchronously), you need to pass it to the outside world yourself using futures or queues. In this case, I've implemented it with a regular Future; I haven't even thought about how to swap the future out in a non-error prone way for future reads yet.

  4. The outside world now reads the data from the future. That's three separate places I've had to deal with the data, versus a single place with a linear order for sockets.

The biggest difference between raw sockets and protocols is that protocols have their incoming data pushed in to you. If you want to simply wait for data to arrive, you need to implement that yourself! This is only a basic protocol; more complex protocols require more implementing more complicated synchronisation mechanisms manually to communicate between the entirely synchronous protocol callbacks leading to a mess of either create_task everywhere or manually shuffling futures/events around.

Why is it like this? Because Twisted was like this. But Twisted existed in a world before yield from or await, so it has an excuse. asyncio copied it in a world with yield from and await, so it has no excuse.

And no, the answer is also not "because Windows doesn't support a Unix-style select() API properly". If you want select() semantics on Windows, use \Device\Afd like everyone else does (and by everyone else, I mean the entire Javascript and Rust ecosystem).

That's not fair

That's true. It's rare that you'll actually interact with protocols; they are a weird implementation detail of asyncio's event loop mechanisms. The same goes for Trio sockets, but at least for sockets you can use them for esoteric mechanisms like AF_NETLINK or SOCK_RAW whilst still retaining the nice asynchronous API. (You can implement those socket types on asyncio with the even lower level APIs of add_{reader|writer}, but that's not a topic for today).

Instead most asyncio and Trio programs will use streams, a high-level generic API that treats network connections as nothing more than a stream of bytes. Here's how the previous socket example would be written using Trio's streams:

async with trio.open_tcp_stream("irc.libera.chat", port=6667) as stream:
    # type: trio.SocketStream
    await stream.send_all(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")

This is very simple; the returned stream works as an asynchronous context manager that automatically closes the socket when done, regardless of if the inner code succeeds or fails. The send_all method will automatically retry when the underlying socket returns a partial write, so the user doesn't need to implement retry logic for partial writes by hand.

Here's how you do it in asyncio:

reader, writer = await asyncio.open_connection("irc.libera.chat", port=6667)
try:
    writer.write(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
    await writer.drain()
finally:
    writer.close()
    await writer.wait_closed()

This is similar to the Trio example with two major differences:

  • writer.write is synchronous and does not actually perform a full write unless drain() is called.
  • writer.close does not actually perform a close, only schedules it, and you need to use wait_closed to ensure the stream is closed.

    Also, wait_closed will block if the drain method is cancelled. The cancellation issues are everywhere.

The write/drain pair exists entirely as a footgun for anyone who forgets to call drain. Data may get written in the background if you don't call drain(), but if you're in a tight loop with lots of data to write and no other await calls, it will buffer all of that data into the stream's internal buffer without sending it. Even if you do have the writer task rescheduled, the buffer may still fill up anyway if data is being written faster than the background writer can empty it. This is stupid!

It's a not too dissimilar situation with close/wait_closed; close() schedules a close and wait_closed waits for that close to actually be sent. What happens if wait_closed is cancelled? asyncio doesn't really define the semantics for this, unlike the Trio world which very explicitly does. In the Trio world, all closeable objects follow the AsyncResource ABC, which defines an idempotent aclose method that must always succeed.

So what happens for protocols such as TLS that need a graceful goodbye message sent? Trio's SSL helpers will try and send a graceful close, and if that times out the stream will be severed by force instead. The end-user doesn't need to know anything about this; they can call aclose on a resource to close it and not worry about if it will be cancelled or if the resource is actually closed.

Major Problem #4: asyncio.Queue is difficult to use

I have two tasks: a producer (that makes messages) and a consumer (that eats messages). Here they are:

async def producer():
    while True:
        message = await do_some_networking_thing()
        # i don't know how to send a message...

async def consumer():
    while True:
        message = # i don't know how to receive a message...
        await eat(message)

How do I get messages between them? I could use a Future, but that would only work exactly once and both of these functions are running in a loop. I could find a way to ferry Future instances between them, but if I could do that I would use the ferry to communicate the messages instead.

The solution is an asyncio.Queue, which is the asynchronous version of queue.Queue (which is the Python version of java.util.concurrent.ArrayBlockingQueue). Let's pass a queue to both functions:

async def producer(queue: asyncio.Queue):
    while True:
        message = await do_some_networking_thing()
        await queue.put(message)

async def consumer(queue: asyncio.Queue):
    while True:
        message = await queue.get()
        await eat(message)

async def main():
    queue = asyncio.Queue()
    t1 = asyncio.create_task(producer(queue))
    t2 = asyncio.create_task(consumer(queue))

    while True:
        await asyncio.sleep(99999999)

asyncio.run(main())

This will have the producer loop forever creating items and putting them in the queue, and the consumer will loop forever reading items from thee queue and doing something with them. This is a very common pattern which is similar to communicating sequential processes. But what happens if consumer throws an exception in eat? Let's go over the control flow:

  1. producer produces an item and sends it to the queue.
  2. consumer receives an item and calls eat.
  3. eat raises an exception and the consumer task dies. For the sake of understanding, this exception is a transient external exception and is not related to either the code or the item being consumed.
  4. producer produces an item and sends it to the queue.
  5. producer produces an item and sends it to the queue.
  6. producer produces an item and sends it to the queue.
  7. Your system locks up as the out-of-memory killer fails to run.

This is because the consumer exerts no backpressure on the producer; the producer will gladly keep sending items into the queue forever that nobody is listening to. I can add some backpressure by using asyncio.Queue(maxsize=1), which changes the control flow like so:

  1. producer produces an item and sends it to the queue.
  2. consumer receives an item and calls eat.
  3. eat raises an exception and the consumer task dies.
  4. producer produces an item and sends it to the queue.
  5. producer produces an item, tries sending it to the queue, but blocks forever because there's nobody reading from the queue.

That's a little bit better in the sense it won't leak memory forever, but instead it will lock up forever because the producer has no way of knowing that the consumer isn't listening anymore. In Python 3.13 the Queue.shutdown method was added which lets one (or both) sides know that the queue is closed and can't accept (or receive) any new items. Let's adjust the code to use that:

If you're stuck on Python 3.12 or earlier, there's no Queue.shutdown available.

async def consumer(queue: asyncio.Queue):
    while True:
        message = await queue.get()

        try:
            await eat(message)
        except:
            queue.shutdown()
            raise

Now the control flow goes as follows:

  1. producer produces an item and sends it to the queue.
  2. consumer receives an item and calls eat.
  3. eat raises an exception and the consumer task dies.
  4. producer produces an item and tries sending it to the queue, but fails because the queue is shut down.

Except... that's not true. There's a race condition going on between steps three and four; if producer puts an item into the queue before the consumer task is killed, then the item that was sent to the queue remains there forever. There's a pair of methods, join() and task_done that can solve this, meaning my code now looks like this:

async def producer(queue: asyncio.Queue):
    while True:
        message = await do_some_networking_thing()
        await queue.put(message)
        await queue.join()

async def consumer(queue: asyncio.Queue):
    while True:
        try:
            message = await queue.get()
            await eat(queue)
        except:
            queue.shutdown(immediate=True)
            raise
        else:
            queue.task_done()

And the control flow goes as follows:

  1. producer produces an item and sends it to the queue.
  2. producer begins blocking until the consumer calls task_done.
  3. consumer receives an item and calls eat.
  4. eat raises an exception and the consumer task dies. The queue is shut down.
  5. queue.join wakes up because I passed immediate=True. If I didn't pass that, it would block forever instead.
  6. producer produces an item and tries sending it to the queue, but put fails because the queue is shut down.

This eliminates the race condition entirely. This isn't a very useful pattern because with one consumer and one producer it can be generalised into just calling the consumer function from the producer. It would be more useful if I add a second consumer, assuming consumers are slower than the producer:

  1. producer produces an item and sends it to the queue.
  2. producer begins blocking until the consumer calls task_done.
  3. Consumer task 1 receives an item and calls eat.
  4. Consumer task 2 sits there idly because the producer can't do anything until the first consumer task has finished.
  5. Consumer task 1 has an exception, and shuts down the queue.
  6. Consumer task 2 has an exception because the queue was closed.
  7. Everything explodes in a fiery mess of exceptions.

To fix this, consumer task 1 won't shut down the queue but will restart itself, perhaps from an external supervisor.

async def consumer(queue: asyncio.Queue):
    while True:
        try:
            message = await queue.get()
            await eat(queue)
        except Exception:
            logger.exception()
            return
        else:
            queue.task_done()

Let's look at the control flow for a final time:

  1. producer produces an item and sends it to the queue.
  2. producer begins blocking until the consumer calls task_done.
  3. Consumer task 1 receives an item and calls eat.
  4. Consumer task 2 sits there idly because the producer can't do anything until the first consumer task has finished.
  5. Consumer task 1 has an exception, and returns.
  6. Consumer task 2 remains blocking on get.
  7. producer continues blocking on join.
  8. The freshly rebooted consumer task 1 starts blocking on get

This could be fixed by making the first consumer task try and re-insert an item on an exception, but what happens if the second task has had an error? Deadlocks. At this point, I give up and pull in an AMQP server instead of dealing with in-library queues.

It doesn't have to be this way

What I'm really looking for is a combination of the following:

  • A queue that blocks until a receiver has retrieved the item (aka, automatic .join()).
  • A queue that can be cloned and independently closed without affecting other consumers.

Trio's channels implement these behaviours. Let's re-write the consumer/producer pattern to use channels:

async def producer(channel: trio.MemorySendChannel[Message]):
    async with channel:
        while True:
            message: Message = await do_some_networking_thing()
            await channel.send(message)


async def consumer(channel: trio.MemoryReceiveChannel[Message]):
    async with channel:
        while True:
            result = await channel.receive()

            try:
                await do_something(result)
            except Exception:
                logger.exception()
                return
            

async def main():
    send, receive = trio.open_memory_channel[Message](max_buffer_size=0)

    async with trio.open_nursery() as n:
        for _ in range(5):
            consumer_channel = receive.clone()
            n.start_soon(partial(consumer, consumer_channel))

        n.start_soon(partial(producer, send))

Trio channels with a buffer size of zero act as transfer queues, a name coined by Java 7 (released in 2011 (!!)), where the sender always waits for a receiver to take a message from the channel. Each receiver gets its own unique clone of the channel that can be independently cloned and messages are sent from the sender channel in a round-robin fashion. These clones can be independently closed without affecting the other cloned channels; only once the final receive channel is closed will the sending channel begin raising errors. TransferQueue was created four solid years before asyncio existed. I really see no excuse for this behaviour to have existed when asyncio was being developed.

The only problem this doesn't solve is that if the consumer has an error after receiving an object, that object stays unprocessed. This is a problem with both implementations and channels don't (yet) fix this; but there's nothing in the conceptual model that would prevent some form of RetryingChannel class that blocks the producer until an item is eventually processed. The same can't really be said of Queues, which will always buffer at least one item no matter what you do.

A more detailed look at all the issues with backpressure can be read in this post by the Trio creator.

Less Major Problems, a collection

Whilst those four areas are some of the worst parts of asyncio, there's a lot of minor warts that make it unpleasant to use everywhere else.

Threads are stupid

It is an inevitability that asynchronous code needs to use threads for computationally intensive code or for libraries that still use blocking I/O. asyncio offers two APIs for this:

  • asyncio.to_thread which propagates context variables correctly to worker threads but doesn't let you specify the concurrent.futures.ThreadPoolExecutor to use.

  • loop.run_in_executor which doesn't propagate context variables but does let you specify the ThreadPoolExecutor to use; you need to wrap every function you're passing in a Context.run call.

This trade-off is very niche but it also doesn't really need to exist. The more important problem with threads comes from calling back into the event loop from a thread; cancellation does not propagate properly! Take this example:

import asyncio
from functools import partial


async def coro():
    await asyncio.sleep(5)
    print("as if i would be cancelled!")


def in_thread(loop: asyncio.AbstractEventLoop):
    fut = asyncio.run_coroutine_threadsafe(coro(), loop)
    fut.result()


async def main():
    t = asyncio.create_task(asyncio.to_thread(partial(in_thread, asyncio.get_running_loop())))
    await asyncio.sleep(0)
    t.cancel()
    await asyncio.sleep(7)


asyncio.run(main())

Running this will print as if i would be cancelled! because cancelling the to_thread task will not cancel the synchronous task running on the event loop. Let's look at how Trio does it:

from functools import partial

import trio
import trio.from_thread
import trio.to_thread


async def async_task():
    await trio.sleep(5)
    print("looks like I survived being cancelled")
    return 1


def sync_task():
    try:
        ret = trio.from_thread.run(async_task)
    except BaseException as e:
        print("raised", e)
    else:
        print("returned", ret)


async def main():
    async with trio.open_nursery() as group:
        group.start_soon(partial(trio.to_thread.run_sync, sync_task))
        await trio.sleep(1)
        group.cancel_scope.cancel()


trio.run(main)

Cancelling the outer cancel scope will cancel the inner task and this code will print raised Cancelled as the exception (correctly) propagates outwards into the sync_task function.

Other, minor problems

  • asyncio's Unix signal API consists entirely of loop.add_signal_handler, which takes a callback and schedules it on the event loop when a single signal is received. Compare this to Trio's open_signal_receiver API which lets you listen to multiple signals with one object, uses an asynchronous context manager to ensure that the handler is cleaned up, and is an iterator instead of a callback so the control flow is significantly more linear.

  • Eager tasks were a performance optimisation that was added where create_task forces a task to run up to the first suspension point, as opposed to lazy tasks where they will not run until the next tick of the event loop.

    Unfortunately, they were broken on release (1, 2) when interacting with TaskGroup, and libraries often depend on the explicit semantics of lazy tasks that have existed up to the present day.

  • Speaking of TaskGroups, they are a mechanism to enforce structured concurrency in an asyncio world. But due to asyncio's lack of block-based cancellation - it only supports cancellation of single tasks - there's no way to cancel entire task groups. You have to cancel the task running the TaskGroup instead, which doesn't work if you only want to cancel a nested TaskGroup and not the root one.

    Trio does not have this issue because it has block-scoped cancellation instead.

Conclusion

asyncio is not a good library. It is constantly full of sharp edges everywhere with implementation details leaking and poorly designed APIs forcing end users into odd code patterns to avoid fundamental flaws in the interfaces.

Trio fixes nearly every single issue in this post. AnyIO implements Trio-like semantics on top of Trio, whilst still letting you use most parts of libraries designed for asyncio.