python - 为什么要显式调用 asyncio.StreamWriter.drain?

标签 python python-3.x python-asyncio streamwriter

来自 doc :

write(data)

Write data to the stream.

This method is not subject to flow control. Calls to write() should be followed by drain().

coroutine drain()

Wait until it is appropriate to resume writing to the stream. Example:

writer.write(data)
await writer.drain()

据我了解,

  • 每次调用write时都需要调用drain
  • 如果不是我猜,write 会阻塞循环线程

那write为什么不是自动调用的协程呢?为什么一个调用 write 而不必耗尽?我可以想到两种情况

  1. 您想立即编写关闭
  2. 您必须在消息完成之前缓冲一些数据。

第一个是特例,我想我们可以有不同的 API。缓冲应该在写函数内部处理,应用程序不应该关心。


让我以不同的方式提出这个问题。这样做的缺点是什么? python3.8版本有效吗?

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()

注意:drain 文档明确说明如下:

When there is nothing to wait for, the drain() returns immediately.


再次阅读答案和链接,我认为这些功能是这样工作的。 注意:检查已接受的答案以获得更准确的版本。

def write(data):
    remaining = socket.try_write(data)
    if remaining:
        _pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data

async def drain():
    if len(_pendingbuffer) < BUF_LIMIT:
        return
    await wait_until_other_side_is_up_to_speed()
    assert len(_pendingbuffer) < BUF_LIMIT

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()        

那么什么时候使用什么:

  1. 当数据不连续时,比如响应 HTTP 请求。我们只需要发送一些数据,不关心何时到达,内存也不关心 - 只需使用 write
  2. 同上但内存是个问题,使用awrite
  3. 当向大量客户端流式传输数据时(例如一些实时流或一个巨大的文件)。如果数据在每个连接的缓冲区中都是重复的,它肯定会溢出 RAM。在这种情况下,编写一个循环,每次迭代获取一大块数据并调用 awrite。如果文件很大,loop.sendfile 如果可用的话会更好。

最佳答案

From what I understand, (1) You need to call drain every time write is called. (2) If not I guess, write will block the loop thread

两者都不正确,但混淆是可以理解的。 write() 的工作方式如下:

  • write() 的调用只是将数据存储到缓冲区,将其留给事件循环以在稍后实际写出,而无需程序进一步干预.就应用程序而言,数据在后台写入的速度与另一方接收数据的速度一样快。换句话说,每个 write() 都会安排它的数据使用尽可能多的操作系统级写入来传输,这些写入在相应的文件描述符实际可写时发出。所有这一切都会自动发生,甚至无需等待 drain()

  • write() 不是协程,它绝对从不阻塞事件循环。

第二个属性听起来很方便——你可以在任何需要的地方调用write(),甚至是从一个不是async def的函数调用——但它实际上是一个主要的write() 的>缺陷。流 API 公开的写入与接受数据的操作系统完全分离,因此如果您写入数据的速度快于网络对等体读取数据的速度,内部缓冲区将不断增长,您将拥有 memory leak。在你的手上。 drain() 修复了这个问题:如果写入缓冲区变得太大,等待它会暂停协程,并在 os.write() 执行后再次恢复协程后台成功,缓冲区缩小。

您不需要在每次 写入之后等待drain(),但您确实需要偶尔等待它,通常是在write() 被调用。例如:

while True:
    response = await peer1.readline()
    peer2.write(b'<response>')
    peer2.write(response)
    peer2.write(b'</response>')
    await peer2.drain()

drain() 如果待处理的未写入数据量很小,则立即返回。如果数据超过高阈值,drain() 将暂停调用协程,直到待处理的未写入数据量降至低阈值以下。暂停将导致协程停止从 peer1 读取数据,这又会导致对等方减慢它向我们发送数据的速率。这种反馈称为背压。

Buffering should be handled inside write function and application should not care.

这几乎就是 write() 现在的工作方式 - 它确实处理缓冲并且让应用程序不关心,无论是好是坏。另见 this answer了解更多信息。


解决问题的编辑部分:

Reading the answer and links again, I think the the functions work like this.

write() 还是比那个聪明一点。它不会尝试只写一次,它实际上会安排数据继续写入,直到没有数据可写为止。即使您从不 await drain() 也会发生这种情况 - 应用程序唯一必须做的就是让事件循环运行足够长的时间以写出所有内容。

一个更正确的writedrain伪代码可能是这样的:

class ToyWriter:
    def __init__(self):
        self._buf = bytearray()
        self._empty = asyncio.Event(True)

    def write(self, data):
        self._buf.extend(data)
        loop.add_writer(self._fd, self._do_write)
        self._empty.clear()

    def _do_write(self):
        # Automatically invoked by the event loop when the
        # file descriptor is writable, regardless of whether
        # anyone calls drain()
        while self._buf:
            try:
                nwritten = os.write(self._fd, self._buf)
            except OSError as e:
                if e.errno == errno.EWOULDBLOCK:
                    return  # continue once we're writable again
                raise
            self._buf = self._buf[nwritten:]
        self._empty.set()
        loop.remove_writer(self._fd, self._do_write)

    async def drain(self):
        if len(self._buf) > 64*1024:
            await self._empty.wait()

实际实现起来比较复杂,因为:

  • 它写在 Twisted 之上样式 transport/protocol层与自己的复杂 flow control , 不在 os.write;
  • 之上
  • drain() 并没有真正等到缓冲区为空,而是直到它到达 low watermark。 ;
  • _do_write 中引发的 EWOULDBLOCK 以外的异常被存储并在 drain() 中重新引发。

最后一点是调用 drain()另一个很好的理由 - 实际注意到对等点由于写入失败而消失了。

关于python - 为什么要显式调用 asyncio.StreamWriter.drain?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53779956/

相关文章:

python - 让 pubsubhubbub 集线器工作

python - 整个项目中可见的模块/类 - Python

python - 使用上一行值来计算日志

python - 在运行 Sophos 的机器上,为什么我的所有浏览器都无法从我的 Python 应用程序实时接收服务器发送的事件 (sse)?

python - 可取消的 Asyncio 键盘输入

python - 如何增加Python异步循环警告的阈值时间?

python - 没有类的 Pygame 碰撞

python - 将 Pandas 数据框转换为结构化数组

python - 将 django-celery 与 redis 一起使用时出现 NotRegistered 异常

python-3.x - 如何从大字典中列出的每个类别的所有可能的值组合中创建数据框