python - 如何从三重奏 ReceiveStream 一次读取一行?

标签 python python-trio

asyncio 有 StreamReader.readline() ,允许这样的事情:

while True:
    line = await reader.readline()
    ...

(我没有看到 async for 在 asyncio 中可用,但那将是明显的演变)

我如何用 trio 达到同等效果?

我在 trio 0.9 中没有直接看到对此有任何高级支持。我只看到ReceiveStream.receive_some()返回任意大小的二进制 block ;对我来说,解码并将其转换为行式的东西似乎很重要。有没有我可以使用的标准库函数或代码片段?我发现 io stdlib 模块看起来很有前途,但我看不到任何提供“feed”方法的方法。

最佳答案

你是对的,目前 Trio 中没有对此的高级支持。应该有一些东西,虽然我不是 100% 确定它应该是什么样子。我打开了an issue进行讨论。

与此同时,您的实现看起来很合理。

如果你想让它更健壮,你可以 (1) 使用 bytearray 而不是 bytes 作为你的缓冲区,来添加和删除摊销的 O( n) 而不是 O(n^2),(2) 限制最大行长度,因此邪恶的同行不能强制你浪费无限内存缓冲无限长的行,(3) 恢复对 的每次调用在最后一个停止的地方找到,而不是每次都从头开始,再次避免 O(n^2) 行为。如果您只处理合理的线路长度和行为良好的同行,那么这些都不是特别重要,但也没有坏处。

这是您的代码的一个调整版本,它试图合并这三个想法:

class LineReader:
    def __init__(self, stream, max_line_length=16384):
        self.stream = stream
        self._line_generator = self.generate_lines(max_line_length)

    @staticmethod
    def generate_lines(max_line_length):
        buf = bytearray()
        find_start = 0
        while True:
            newline_idx = buf.find(b'\n', find_start)
            if newline_idx < 0:
                # no b'\n' found in buf
                if len(buf) > max_line_length:
                    raise ValueError("line too long")
                # next time, start the search where this one left off
                find_start = len(buf)
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                # Update the buffer in place, to take advantage of bytearray's
                # optimized delete-from-beginning feature.
                del buf[:newline_idx+1]
                # next time, start the search from the beginning
                find_start = 0
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

(随意在您喜欢的任何许可下使用。)

关于python - 如何从三重奏 ReceiveStream 一次读取一行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53575979/

相关文章:

python - 使用 Python 将 XML 模式定义解析为 CSV

Python/Selenium,如何访问没有id的html列表,但页面上有多个相同类的列表

python - BeautifulSoup:如何用跨度标签替换内容

Python:同步三重任务和常规线程的方法

python - 如果失败,不会取消所有任务的三重奏托儿所

python - 如何防止 child 破坏整个托儿所而引发的异常

python - 在 Spyder 中关闭多行提示

python - 在不正确的 merge 之前了解用于硬重置的pycharm提交日志