Skip to content Skip to sidebar Skip to footer

Why Should Asyncio.streamwriter.drain Be Explicitly Called?

From doc: write(data) Write data to the stream. This method is not subject to flow control. Calls to write() should be followed by drain(). coroutine drain() Wait until it is ap

Solution 1:

From what I understand, (1) You need to call drain every time write is called. (2) If not I guess, write will block the loop thread

Neither is correct, but the confusion is quite understandable. The way write() works is as follows:

  • A call to write() just stashes the data to a buffer, leaving it to the event loop to actually write it out at a later time, and without further intervention by the program. As far as the application is concerned, the data is written in the background as fast as the other side is capable of receiving it. In other words, each write() will schedule its data to be transferred using as many OS-level writes as it takes, with those writes issued when the corresponding file descriptor is actually writable. All this happens automatically, even without ever awaiting drain().

  • write() is not a coroutine, and it absolutely never blocks the event loop.

The second property sounds convenient - you can call write() wherever you need to, even from a function that's not async def - but it's actually a major flaw of write(). Writing as exposed by the streams API is completely decoupled from the OS accepting the data, so if you write data faster than your network peer can read it, the internal buffer will keep growing and you'll have a memory leak on your hands. drain() fixes that problem: awaiting it pauses the coroutine if the write buffer has grown too large, and resumes it again once the os.write()'s performed in the background are successful and the buffer shrinks.

You don't need to await drain() after every write, but you do need to await it occasionally, typically between iterations of a loop in which write() is invoked. For example:

while True:
    response = await peer1.readline()
    peer2.write(b'<response>')
    peer2.write(response)
    peer2.write(b'</response>')
    await peer2.drain()

drain() returns immediately if the amount of pending unwritten data is small. If the data exceeds a high threshold, drain() will suspend the calling coroutine until the amount of pending unwritten data drops beneath a low threshold. The pause will cause the coroutine to stop reading from peer1, which will in turn cause the peer to slow down the rate at which it sends us data. This kind of feedback is referred to as back-pressure.

Buffering should be handled inside write function and application should not care.

That is pretty much how write() works now - it does handle buffering and it lets the application not care, for better or worse. Also see this answer for additional info.


Addressing the edited part of the question:

Reading the answer and links again, I think the the functions work like this.

write() is still a bit smarter than that. It won't try to write only once, it will actually arrange for data to continue to be written until there is no data left to write. This will happen even if you never await drain() - the only thing the application must do is let the event loop run its course for long enough to write everything out.

A more correct pseudo code of write and drain might look like this:

classToyWriter:
    def__init__(self):
        self._buf = bytearray()
        self._empty = asyncio.Event(True)

    defwrite(self, data):
        self._buf.extend(data)
        loop.add_writer(self._fd, self._do_write)
        self._empty.clear()

    def_do_write(self):
        # Automatically invoked by the event loop when the# file descriptor is writable, regardless of whether# anyone calls drain()while self._buf:
            try:
                nwritten = os.write(self._fd, self._buf)
            except OSError as e:
                if e.errno == errno.EWOULDBLOCK:
                    return# continue once we're writable againraise
            self._buf = self._buf[nwritten:]
        self._empty.set()
        loop.remove_writer(self._fd, self._do_write)

    asyncdefdrain(self):
        iflen(self._buf) > 64*1024:
            await self._empty.wait()

The actual implementation is more complicated because:

  • it's written on top of a Twisted-style transport/protocol layer with its own sophisticated flow control, not on top of os.write;
  • drain() doesn't really wait until the buffer is empty, but until it reaches a low watermark;
  • exceptions other than EWOULDBLOCK raised in _do_write are stored and re-raised in drain().

The last point is another good reason to call drain() - to actually notice that the peer is gone by the fact that writing to it is failing.

Post a Comment for "Why Should Asyncio.streamwriter.drain Be Explicitly Called?"