asyncio 有 StreamReader.readline()
,允许这样的事情:
while True:
line = await reader.readline()
...
(我没有看到 async for
在 asyncio 中可用,但那将是明显的演变)
我如何用 trio 达到同等效果?
我在 trio 0.9 中没有直接看到对此有任何高级支持。我只看到ReceiveStream.receive_some()
返回任意大小的二进制 block ;对我来说,解码并将其转换为行式的东西似乎很重要。有没有我可以使用的标准库函数或代码片段?我发现 io stdlib 模块看起来很有前途,但我看不到任何提供“feed”方法的方法。
最佳答案
你是对的,目前 Trio 中没有对此的高级支持。应该有一些东西,虽然我不是 100% 确定它应该是什么样子。我打开了an issue进行讨论。
与此同时,您的实现看起来很合理。
如果你想让它更健壮,你可以 (1) 使用 bytearray
而不是 bytes
作为你的缓冲区,来添加和删除摊销的 O( n) 而不是 O(n^2),(2) 限制最大行长度,因此邪恶的同行不能强制你浪费无限内存缓冲无限长的行,(3) 恢复对 的每次调用在最后一个停止的地方找到
,而不是每次都从头开始,再次避免 O(n^2) 行为。如果您只处理合理的线路长度和行为良好的同行,那么这些都不是特别重要,但也没有坏处。
这是您的代码的一个调整版本,它试图合并这三个想法:
class LineReader:
def __init__(self, stream, max_line_length=16384):
self.stream = stream
self._line_generator = self.generate_lines(max_line_length)
@staticmethod
def generate_lines(max_line_length):
buf = bytearray()
find_start = 0
while True:
newline_idx = buf.find(b'\n', find_start)
if newline_idx < 0:
# no b'\n' found in buf
if len(buf) > max_line_length:
raise ValueError("line too long")
# next time, start the search where this one left off
find_start = len(buf)
more_data = yield
else:
# b'\n' found in buf so return the line and move up buf
line = buf[:newline_idx+1]
# Update the buffer in place, to take advantage of bytearray's
# optimized delete-from-beginning feature.
del buf[:newline_idx+1]
# next time, start the search from the beginning
find_start = 0
more_data = yield line
if more_data is not None:
buf += bytes(more_data)
async def readline(self):
line = next(self._line_generator)
while line is None:
more_data = await self.stream.receive_some(1024)
if not more_data:
return b'' # this is the EOF indication expected by my caller
line = self._line_generator.send(more_data)
return line
(随意在您喜欢的任何许可下使用。)
关于python - 如何从三重奏 ReceiveStream 一次读取一行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53575979/