asyncio: a library with too many sharp corners

One of the headliner features of Python 3.4 (released in 2014) was a new library in the standard
library: asyncio
, provisionally introduced for feedback as an import of the
external tulip
library. In Python 3.5 (released in 2015), async
and
await
were added as keywords to the language specifically for usage with
asynchronous libraries, replacing the usage of yield from
. The
asyncio
module was also made non-provisional in this release, heralding an entire
new ecosystem of asynchronous libraries in the Python world.
But asyncio
has so many sharp corners and design issues it is far too difficult to
use, bordering on being fundamentally broken. Some of these were only realised in hindsight
because other languages (Kotlin, Swift) and libraries did what asyncio
does
significantly better; but most of these issues were bad at release and it is baffling how the
library made it out of provisional status with such glaring flaws.
I mention the Trio library a lot in this
post, but there's also the
AnyIO library that implements
Trio-like semantics on top of asyncio
, fixing most of the issues described here
whilst retaining a level of compatibility with regular asyncio
libraries.
Contents
- Major Problem #1: Cancellation is broken
- Major Problem #2: Task was destroyed but it is pending!
- Major Problem #3: I/O has pointless landmines
-
Major Problem #4:
asyncio.Queue
is difficult to use - Other, less major problems
Major Problem #1: Cancellation is broken
In the traditional model of concurrent programming using threads, there is no clean way to do
cancellation. In the standard pthreads
model, the only way to do cancellation is to
brutally murder a thread using pthread_kill
, which is nearly always a bad idea
because anything the thread was using (such as locks) will be in an unknown and inconsistent
state if the thread was killed in the middle of an operation. If you want a better mechanism,
you need to implement it yourself by constantly polling a shared state object in-between doing
work, using a loop like so:
cancelled = threading.Event()
def t1():
while not cancelled.is_set():
do_work()
def main():
threading.Thread(target=t1).start()
# ... do something inbetween
cancelled.set()
This is unergonomic and error-prone as only threads that opt-in to this cancellation mechanism
can be cancelled and only when they explicitly check if they are cancelled. Some languages (i.e.
Java) make this a bit better by having a Thread.interrupt()
method that handles
dealing with communicating the interrupted state, with most standard library functions such as
Object.wait()
automatically checking for the interrupted state. (This still falls
victim to the other issues described here.)
asyncio
is an asynchronous runtime and is responsible for its own scheduling of its
own tasks, instead of the kernel. When an asyncio
task needs to interact with the
system, it asks the event loop to suspend it until an operation is complete whereupon the task
will be rescheduled and will run again in the next tick of the event loop. Threads do the same,
but with system calls, and the user application has no control over the kernel's scheduler
beyond tweaking some tuning parameters.
This scheduling mechanism is reused to implement cancellation. When a task is cancelled, any
pending operation that the event loop was performing for a task is cancelled, and instead the
call raises a CancelledError
. Unlike threads tasks no longer need to check if they
have been cancelled; every single time a call drops into the event loop the runtime itself
checks for cancellation. Conceptually, you can imagine every await
as a
cancellation point:
async def something(stream: SomeLibraryStream):
while True:
result = await stream.read() # Cancellation point
parsed = do_parse(result) # *not* a cancellation point
From this, we can derive a conceptual model of how tasks and cancellations interact:
- Tasks run until an
await
, at which point they suspend. - Something else calls
task.cancel()
, which reschedules the task again. - The function that was being
await
-ed now raisesCancelled
. - This exception propagates backwards, unwinding through all functions in the call stack and cleaning up as it goes.
This avoids both problems with threads: tasks can be externally killed without worrying about resources not being torn down, and end-user tasks don't need to constantly check if they've been cancelled because the event loop does it for you.
But that's not how it works
Consider this function below that returns a resource wrapped in an asynchronous context manager.
When the user is done, it needs to clean up some resources (say, a server needs a clean close).
This cleanup should be done regardless of if the code running inside the context manager was
successful or not, so it's ran inside a finally
block:
@asynccontextmanager
async def connect_to_server(ip: str, *, port: int = 6767) -> AsyncIterator[Sock]:
sock = await connect_socket(ip, port)
async with sock:
await sock.send(b"IDENTIFY ident :bar\r\nNICKNAME :gquuuuuux)\r\n")
try:
yield sock
finally:
await sock.send(b"QUIT :died to some small fry")
In this case, let's say .send()
waits for some form of acknowledgement message.
There's also another task that is spawned somewhere, and it's sending a
PING
message to the server every few seconds and expecting a
PONG
message. If the client goes too long without receiving a PONG
, it
cancels the task inside the context manager and exits itself.
What happens if the server does stop responding, and the task is cancelled? Let's see:
-
First, any code in the user function running inside the asynchronous context manager is cancelled with a
CancelledException
bubbling upwards. -
Next, the
yield sock
expression raises aCancelledException
, and control flows into thefinally
block. -
The code enters the
sock.send()
function, which re-enters the event loop. The event loop completely forgets that the task was cancelled and is entirely happy to deadlock the application forever waiting for the server to respond to the.send()
(which will never happen).
This is because cancellations in asyncio
are edge-triggered, not
level-triggered. These concepts are mostly used in the world of electronics, but are
also applicable to certain types of programming too; an edge-triggered event only fires
once when the state changes. In this case, it's
Task.cancel()
firing a cancellation error exactly once. This is the opposite
behaviour to level-triggered cancellations, where cancelling a task will cause
all calls to the event loop to raise a
CancelledException
, forever.
Here's a more practical example that you can run directly on your computer to see this behaviour.
import asyncio
event = asyncio.Event()
async def fn():
try:
event.set()
await asyncio.sleep(60)
finally:
await asyncio.sleep(5)
print("slept for 5s")
async def main():
task = asyncio.create_task(fn())
await event.wait()
task.cancel()
await asyncio.sleep(10)
asyncio.run(main())
When you run this, the first sleep(60)
will be cancelled, and then the program will
sleep for five more seconds before printing a slept for 5s
message because the
cancellation disappeared.
This is absolutely 100% the wrong behaviour and it makes cancellations dangerous when it can be swallowed or covered up at any point.
-
Using a bare
except:
? Swallows cancellations. People will lie and say that they don't write these, but people do use bareexcepts
. Even if you don't, do you know that every other library doesn't? -
Doing cleanup in
__aexit__
? Can deadlock waiting for something that will never happen, swallowing the cancellation. -
Doing cleanup in
try/finally
? See above.
It could be better
Graceful asynchronous cleanup is intrinsically a difficult problem; if an operation blocks for
too long, what do you do? If you adopt a rigid rule of always trying to be graceful you
risk running into deadlocks if the operation never returns. If you simply avoid doing anything
gracefully and just sever connections and open files with a machete you can end up with
half-written data or some very unhappy servers on the other end. It doesn't really matter in the
asyncio
world, because the library doesn't give you any tools to implement this.
The Trio library takes the opposite approach; all cancellations are level-triggered. Let's port the sleeping example above to use Trio instead:
import trio
event = trio.Event()
async def task():
try:
event.set()
await trio.sleep(60)
finally:
await trio.sleep(5)
print("slept for 5s")
async def main():
async with trio.open_nursery() as n:
n.start_soon(task)
await event.wait()
n.cancel_scope.cancel()
await trio.sleep(10) # Not needed, but for parity with the previous example.
trio.run(main)
Running this will produce... no output. It won't wait either, because anything that could wait
has been cancelled. If you add a print()
between the event.wait
and
the cancel_scope.cancel()
, that will print something too, so it's not exiting early
because it's not running anything.
This then asks a question: How do you do graceful cleanup? With shielded cancel scopes and timeouts. I'll replace the finally block above with one of those:
finally:
with trio.move_on_after(1, shield=True):
await trio.sleep(5)
print("slept for 1s?")
await trio.sleep(5)
print("slept for 5s?")
Running this will print slept for 1s?
, but nothing more. The code running inside
the context manager ignored the outside cancellation, but was re-cancelled after a second
anyway. This once again nets you the best of both worlds: cancellations aren't swallowed unless
you explicitly opt-in. Remember the Zen of Python: Explicit is better than implicit.
Major Problem #2: Task was destroyed but it is pending!
If you've ever used an asyncio application, you've probably seen that message pop up before. As
an example, if I Ctrl-C
portage too quickly, it spits out a few of
those errors. Why? Because asyncio
does not keep strong references to
tasks. Quoting the official documentation:
Important
Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done. For reliable “fire-and-forget” background tasks, gather them in a collection:
Let's take some example code:
import asyncio, gc
async def expose_bugs():
while True:
await asyncio.sleep(0.5)
# Simulate doing work that would have the GC fire.
gc.collect()
async def has_bug():
loop = asyncio.get_running_loop()
fut = loop.create_future()
await fut
async def main():
t = asyncio.create_task(expose_bugs())
asyncio.create_task(has_bug())
await asyncio.sleep(5)
asyncio.run(main())
If you run this, it will print a warning to stderr about how has_bug
was destroyed
when it was pending. has_bug
has no strong references to it, so when the GC runs
the weak reference the event loop holds is removed and the task is dropped on the floor.
Goodbye, has_bug
.
This is very obviously insane behaviour, but it can somewhat be avoided by
always holding references to spawned tasks (similarly to how you can avoid segmentation
faults by always doing bounds checking). But it gets worse. There's a set of helper functions
that are used for corralling tasks around: wait_for
, gather
, or
shield
; these can all cause a function being waited on to be dropped on the floor
because they internally spawn said function as a task and wait on that instead:
import asyncio, gc
async def expose_bugs():
while True:
await asyncio.sleep(0.5)
# Simulate doing work that would have the GC fire.
gc.collect()
async def has_bug():
loop = asyncio.get_running_loop()
fut = loop.create_future()
await fut
async def shield_task():
await asyncio.shield(has_bug())
async def main():
t1 = asyncio.create_task(expose_bugs())
t2 = asyncio.create_task(shield_task())
# scheduling pass
await asyncio.sleep(1)
t2.cancel()
await asyncio.sleep(2)
asyncio.run(main())
When t2
is cancelled, the outer await asyncio.shield(...)
call is
cancelled. The cancellation doesn't propagate through into has_bug
because of the
shielding, and the outer task still has a strong reference in the form of t2
. But
has_bug
's task has no strong references to it; the only reference was in the local
variables of the shield()
functions. The next time the event loop ticks,
gc.collect()
is called, which drops the has_bug
task entirely.
You might try to avoid this by doing create_task
explicitly as this will keep a
strong reference to the has_bug()
task in the local variables of the cancelled
generator coroutine for shield_task
, like so:
async def shield_task():
inner = asyncio.create_task(has_bug())
await asyncio.shield(inner)
But this only works all the while the handle to t2
lives inside
main()
. If that handle gets dropped, then the inner has_bug
will
also get dropped! Adding a del t2
after the t2.cancel()
will
expose this immediately. Good luck tracking this through a web of classes and tasks.
Major Problem #3: I/O has pointless landmines
The underlying API for performing network I/O is the ever-venerable BSD socket API. Python exposes a nice object-based API for working with sockets; let's look at some code that opens a connection on a socket and sends some data.
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.connect(("2001:708:40:2001::11ba", 6667))
s.send(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
motd = s.recv(4096)
s.shutdown(socket.SHUT_RDWR) # Graceful close, send an EOF
s.close()
This is pretty bare-bones, but it's easy to see how the code flows: top to bottom. It's a set of linear statements:
- Create the socket with the appropriate options.
- Connect it to an address directly.
- Send some data from a socket.
- Receive some data from the socket.
- Shut it down and close the socket.
This all happens in order; it's simple to follow. Trio offers an asynchronous version of this, so let's rewrite the code to be identical with Trio sockets:
s = trio.socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
await s.connect(("2001:708:40:2001::11ba", 6667))
await s.send(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
motd = s.recv(4096)
s.shutdown(socket.SHUT_RDWR) # Graceful close, schedules an EOF
s.close()
The code is almost identical, with some await
statements introduced before every
function that would normally block. Again, the control flow is simple; it flows from top to
bottom in a linear fashion. Let's look at asyncio
's version of sockets, which are
called protocols:
import asyncio
class IdentifyProtocol(asyncio.Protocol):
def __init__(self, message: bytes, motd_future: asyncio.Future):
self.message = message
self.motd = motd_future
def connection_made(self, transport: asyncio.WriteTransport):
transport.write(self.message.encode())
def data_received(self, data: bytes):
self.motd.set_result(data)
def connection_lost(self, exc: BaseException):
...
fut = loop.create_future()
transport, protocol = await loop.create_connection(
partial(EchoClientProtocol, b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n", fut),
"2001:708:40:2001::11ba",
6667,
)
motd = await protocol.motd
Unlike regular BSD sockets or Trio's socket wrappers, asyncio
uses
callbacks - synchronous ones, at that - to implement the low-level I/O primitives. The
control flow here jumps around a lot:
-
create_connection
is equivalent tosocket.socket
+socket.connect
. Okay. You don't get to set socket options (at least it setsTCP_NODELAY
) and it doesn't work for anything other than regular AF_INET/AF_INET6 sockets.It returns a tuple of
(write transport, protocol instance)
; the former can be used to send further data (synchronously). -
When the socket is opened, it jumps into my class and calls (synchronously)
connection_made
, providing a "transport" which I can call the (synchronous)write
method on to send my authentication method.There's no way to wait for this to be sent as
WriteTransport.write
is synchronous. It'll get sent at some point in the future. Maybe. If you want to let somebody know that you've sent the message, you'll need to implement that yourself too. -
After some time, the server will respond; the event loop calls (synchronously)
data_received
, providing the received data. If you want to do something with this data (asynchronously), you need to pass it to the outside world yourself using futures or queues. In this case, I've implemented it with a regularFuture
; I haven't even thought about how to swap the future out in a non-error prone way for future reads yet. -
The outside world now reads the data from the future. That's three separate places I've had to deal with the data, versus a single place with a linear order for sockets.
The biggest difference between raw sockets and protocols is that protocols have their incoming
data pushed in to you. If you want to simply wait for data to arrive, you need
to implement that yourself! This is only a basic protocol; more complex protocols require more
implementing more complicated synchronisation mechanisms manually to communicate between the
entirely synchronous protocol callbacks leading to a mess of either
create_task
everywhere or manually shuffling futures/events around.
Why is it like this? Because Twisted was like this. But Twisted existed in a world before
yield from
or await
, so it has an excuse. asyncio
copied
it in a world with yield from
and await
, so it has no excuse.
And no, the answer is also not "because Windows doesn't support a Unix-style
select()
API properly". If you want select()
semantics on
Windows, use \Device\Afd
like
everyone else does (and by everyone else, I
mean the entire Javascript and Rust ecosystem).
That's not fair
That's true. It's rare that you'll actually interact with protocols; they are a weird
implementation detail of asyncio
's event loop mechanisms. The same goes for Trio
sockets, but at least for sockets you can use them for esoteric mechanisms like
AF_NETLINK
or SOCK_RAW
whilst still retaining the nice asynchronous
API. (You can implement those socket types on asyncio
with the even lower level
APIs of add_{reader|writer}
, but that's not a topic for today).
Instead most asyncio
and Trio programs will use streams, a high-level
generic API that treats network connections as nothing more than a stream of bytes. Here's how
the previous socket example would be written using Trio's streams:
async with trio.open_tcp_stream("irc.libera.chat", port=6667) as stream:
# type: trio.SocketStream
await stream.send_all(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
This is very simple; the returned stream works as an asynchronous context manager that
automatically closes the socket when done, regardless of if the inner code succeeds or fails.
The send_all
method will automatically retry when the underlying socket returns a
partial write, so the user doesn't need to implement retry logic for partial writes by hand.
Here's how you do it in asyncio
:
reader, writer = await asyncio.open_connection("irc.libera.chat", port=6667)
try:
writer.write(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
await writer.drain()
finally:
writer.close()
await writer.wait_closed()
This is similar to the Trio example with two major differences:
-
writer.write
is synchronous and does not actually perform a full write unlessdrain()
is called. -
writer.close
does not actually perform a close, only schedules it, and you need to usewait_closed
to ensure the stream is closed.Also,
wait_closed
will block if thedrain
method is cancelled. The cancellation issues are everywhere.
The write
/drain
pair exists entirely as a footgun for anyone who
forgets to call drain
. Data may get written in the background if you don't call
drain()
, but if you're in a tight loop with lots of data to write and no other
await
calls, it will buffer all of that data into the stream's internal
buffer without sending it. Even if you do have the writer task rescheduled, the buffer may still
fill up anyway if data is being written faster than the background writer can empty it. This is
stupid!
It's a not too dissimilar situation with close
/wait_closed
;
close()
schedules a close and wait_closed
waits for that close to
actually be sent. What happens if wait_closed
is cancelled?
asyncio
doesn't really define the semantics for this, unlike the
Trio
world which very explicitly does. In the Trio world, all closeable objects
follow the AsyncResource
ABC, which defines an idempotent
aclose
method that must always succeed.
So what happens for protocols such as TLS that need a graceful goodbye message sent? Trio's SSL
helpers will try and send a graceful close, and if that times out the stream will be severed by
force instead. The end-user doesn't need to know anything about this; they can call
aclose
on a resource to close it and not worry about if it will be cancelled or if
the resource is actually closed.
Major Problem #4: asyncio.Queue
is difficult to use
I have two tasks: a producer (that makes messages) and a consumer (that eats messages). Here they are:
async def producer():
while True:
message = await do_some_networking_thing()
# i don't know how to send a message...
async def consumer():
while True:
message = # i don't know how to receive a message...
await eat(message)
How do I get messages between them? I could use a Future
, but that would only work
exactly once and both of these functions are running in a loop. I could find a way to ferry
Future
instances between them, but if I could do that I would use the ferry to
communicate the messages instead.
The solution is an asyncio.Queue
, which is the asynchronous version of
queue.Queue
(which is the Python version of
java.util.concurrent.ArrayBlockingQueue
). Let's pass a queue to both functions:
async def producer(queue: asyncio.Queue):
while True:
message = await do_some_networking_thing()
await queue.put(message)
async def consumer(queue: asyncio.Queue):
while True:
message = await queue.get()
await eat(message)
async def main():
queue = asyncio.Queue()
t1 = asyncio.create_task(producer(queue))
t2 = asyncio.create_task(consumer(queue))
while True:
await asyncio.sleep(99999999)
asyncio.run(main())
This will have the producer loop forever creating items and putting them in the queue, and the
consumer will loop forever reading items from thee queue and doing something with them. This is
a very common pattern which is similar to communicating sequential processes. But what
happens if consumer
throws an exception in eat
? Let's go over the
control flow:
producer
produces an item and sends it to the queue.consumer
receives an item and callseat
.-
eat
raises an exception and the consumer task dies. For the sake of understanding, this exception is a transient external exception and is not related to either the code or the item being consumed. producer
produces an item and sends it to the queue.producer
produces an item and sends it to the queue.producer
produces an item and sends it to the queue.- Your system locks up as the out-of-memory killer fails to run.
This is because the consumer exerts no backpressure on the producer; the producer will
gladly keep sending items into the queue forever that nobody is listening to. I can add some
backpressure by using asyncio.Queue(maxsize=1)
, which changes the control flow like
so:
producer
produces an item and sends it to the queue.consumer
receives an item and callseat
.eat
raises an exception and the consumer task dies.producer
produces an item and sends it to the queue.-
producer
produces an item, tries sending it to the queue, but blocks forever because there's nobody reading from the queue.
That's a little bit better in the sense it won't leak memory forever, but instead it
will lock up forever because the producer has no way of knowing that the consumer isn't
listening anymore. In Python 3.13 the Queue.shutdown
method was added which lets
one (or both) sides know that the queue is closed and can't accept (or receive) any new
items. Let's adjust the code to use that:
If you're stuck on Python 3.12 or earlier, there's no Queue.shutdown
available.
async def consumer(queue: asyncio.Queue):
while True:
message = await queue.get()
try:
await eat(message)
except:
queue.shutdown()
raise
Now the control flow goes as follows:
producer
produces an item and sends it to the queue.consumer
receives an item and callseat
.eat
raises an exception and the consumer task dies.-
producer
produces an item and tries sending it to the queue, but fails because the queue is shut down.
Except... that's not true. There's a race condition going on between steps three and
four; if producer
puts an item into the queue before the consumer task is killed,
then the item that was sent to the queue remains there forever. There's a pair of methods,
join()
and task_done
that can solve this, meaning my code now looks
like this:
async def producer(queue: asyncio.Queue):
while True:
message = await do_some_networking_thing()
await queue.put(message)
await queue.join()
async def consumer(queue: asyncio.Queue):
while True:
try:
message = await queue.get()
await eat(queue)
except:
queue.shutdown(immediate=True)
raise
else:
queue.task_done()
And the control flow goes as follows:
producer
produces an item and sends it to the queue.producer
begins blocking until the consumer callstask_done
.consumer
receives an item and callseat
.-
eat
raises an exception and the consumer task dies. The queue is shut down. -
queue.join
wakes up because I passedimmediate=True
. If I didn't pass that, it would block forever instead. -
producer
produces an item and tries sending it to the queue, butput
fails because the queue is shut down.
This eliminates the race condition entirely. This isn't a very useful pattern because with one consumer and one producer it can be generalised into just calling the consumer function from the producer. It would be more useful if I add a second consumer, assuming consumers are slower than the producer:
producer
produces an item and sends it to the queue.producer
begins blocking until the consumer callstask_done
.- Consumer task 1 receives an item and calls
eat
. - Consumer task 2 sits there idly because the producer can't do anything until the first consumer task has finished.
- Consumer task 1 has an exception, and shuts down the queue.
- Consumer task 2 has an exception because the queue was closed.
- Everything explodes in a fiery mess of exceptions.
To fix this, consumer task 1 won't shut down the queue but will restart itself, perhaps from an external supervisor.
async def consumer(queue: asyncio.Queue):
while True:
try:
message = await queue.get()
await eat(queue)
except Exception:
logger.exception()
return
else:
queue.task_done()
Let's look at the control flow for a final time:
producer
produces an item and sends it to the queue.producer
begins blocking until the consumer callstask_done
.- Consumer task 1 receives an item and calls
eat
. - Consumer task 2 sits there idly because the producer can't do anything until the first consumer task has finished.
- Consumer task 1 has an exception, and returns.
- Consumer task 2 remains blocking on
get
. producer
continues blocking onjoin
.- The freshly rebooted consumer task 1 starts blocking on
get
This could be fixed by making the first consumer task try and re-insert an item on an exception, but what happens if the second task has had an error? Deadlocks. At this point, I give up and pull in an AMQP server instead of dealing with in-library queues.
It doesn't have to be this way
What I'm really looking for is a combination of the following:
-
A queue that blocks until a receiver has retrieved the item (aka, automatic
.join()
). - A queue that can be cloned and independently closed without affecting other consumers.
Trio's channels implement these behaviours. Let's re-write the consumer/producer pattern to use channels:
async def producer(channel: trio.MemorySendChannel[Message]):
async with channel:
while True:
message: Message = await do_some_networking_thing()
await channel.send(message)
async def consumer(channel: trio.MemoryReceiveChannel[Message]):
async with channel:
while True:
result = await channel.receive()
try:
await do_something(result)
except Exception:
logger.exception()
return
async def main():
send, receive = trio.open_memory_channel[Message](max_buffer_size=0)
async with trio.open_nursery() as n:
for _ in range(5):
consumer_channel = receive.clone()
n.start_soon(partial(consumer, consumer_channel))
n.start_soon(partial(producer, send))
Trio channels with a buffer size of zero act as transfer queues, a name coined by
Java 7
(released in 2011 (!!)), where the sender always waits for a receiver to take a message from the
channel. Each receiver gets its own unique clone of the channel that can be
independently cloned and messages are sent from the sender channel in a round-robin fashion.
These clones can be independently closed without affecting the other cloned channels; only once
the final receive channel is closed will the sending channel begin raising errors.
TransferQueue
was created four solid years before asyncio
existed. I
really see no excuse for this behaviour to have existed when asyncio
was being
developed.
The only problem this doesn't solve is that if the consumer has an error after receiving an
object, that object stays unprocessed. This is a problem with both implementations and channels
don't (yet) fix this; but there's nothing in the conceptual model that would prevent
some form of RetryingChannel
class that blocks the producer until an item is
eventually processed. The same can't really be said of Queues, which will
always buffer at least one item no matter what you do.
A more detailed look at all the issues with backpressure can be read in this post by the Trio creator.
Less Major Problems, a collection
Whilst those four areas are some of the worst parts of asyncio
, there's a lot
of minor warts that make it unpleasant to use everywhere else.
Threads are stupid
It is an inevitability that asynchronous code needs to use threads for computationally intensive
code or for libraries that still use blocking I/O. asyncio
offers two APIs for
this:
-
asyncio.to_thread
which propagates context variables correctly to worker threads but doesn't let you specify theconcurrent.futures.ThreadPoolExecutor
to use. -
loop.run_in_executor
which doesn't propagate context variables but does let you specify theThreadPoolExecutor
to use; you need to wrap every function you're passing in aContext.run
call.
This trade-off is very niche but it also doesn't really need to exist. The more important problem with threads comes from calling back into the event loop from a thread; cancellation does not propagate properly! Take this example:
import asyncio
from functools import partial
async def coro():
await asyncio.sleep(5)
print("as if i would be cancelled!")
def in_thread(loop: asyncio.AbstractEventLoop):
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
fut.result()
async def main():
t = asyncio.create_task(asyncio.to_thread(partial(in_thread, asyncio.get_running_loop())))
await asyncio.sleep(0)
t.cancel()
await asyncio.sleep(7)
asyncio.run(main())
Running this will print as if i would be cancelled!
because cancelling the
to_thread
task will not cancel the synchronous task running on the event loop.
Let's look at how Trio does it:
from functools import partial
import trio
import trio.from_thread
import trio.to_thread
async def async_task():
await trio.sleep(5)
print("looks like I survived being cancelled")
return 1
def sync_task():
try:
ret = trio.from_thread.run(async_task)
except BaseException as e:
print("raised", e)
else:
print("returned", ret)
async def main():
async with trio.open_nursery() as group:
group.start_soon(partial(trio.to_thread.run_sync, sync_task))
await trio.sleep(1)
group.cancel_scope.cancel()
trio.run(main)
Cancelling the outer cancel scope will cancel the inner task and this code will print
raised Cancelled
as the exception (correctly) propagates outwards into the
sync_task
function.
Other, minor problems
-
asyncio
's Unix signal API consists entirely ofloop.add_signal_handler
, which takes a callback and schedules it on the event loop when a single signal is received. Compare this to Trio'sopen_signal_receiver
API which lets you listen to multiple signals with one object, uses an asynchronous context manager to ensure that the handler is cleaned up, and is an iterator instead of a callback so the control flow is significantly more linear. -
Eager tasks were a performance optimisation that was added where
create_task
forces a task to run up to the first suspension point, as opposed to lazy tasks where they will not run until the next tick of the event loop.Unfortunately, they were broken on release (1, 2) when interacting with
TaskGroup
, and libraries often depend on the explicit semantics of lazy tasks that have existed up to the present day. -
Speaking of
TaskGroup
s, they are a mechanism to enforce structured concurrency in anasyncio
world. But due toasyncio
's lack of block-based cancellation - it only supports cancellation of single tasks - there's no way to cancel entire task groups. You have to cancel the task running theTaskGroup
instead, which doesn't work if you only want to cancel a nestedTaskGroup
and not the root one.Trio does not have this issue because it has block-scoped cancellation instead.
Conclusion
asyncio
is not a good library. It is constantly full of sharp edges everywhere with
implementation details leaking and poorly designed APIs forcing end users into odd code patterns
to avoid fundamental flaws in the interfaces.
Trio fixes nearly every single issue in this post. AnyIO implements Trio-like semantics on top
of Trio, whilst still letting you use most parts of libraries designed for asyncio
.