我有一个生产者和消费者架构,其中我的生产者返回一个迭代器,而我的消费者期望一些转换后的结果。两者都超出了我的控制范围。现在我的代码负责转换源流。问题之一是源吞吐量不可靠。它将以不同的速率产生记录。有时太慢。
是否可以在映射阶段终止流?我确实有一个可以设置来终止该进程的标志。顺便说一句,我无法将 Futures 和超时放置在消费者之外。
我尝试过的事情:
在 map 内击杀。如果一段时间内没有生成记录,则永远不会触发此条件,这会带来缺点。
source.map(x=> {if(System.currentTimeMillis()>limit) kill(); x})
另一个选择是使用一段时间。但是,它不能从 while 中产生。
while(source.hasNext()){
Try(Await.result(Future{source.next()}, limit))
match {
case _@Failure(e)=> kill()
case bla..
}
}
有什么创新的想法吗?
最佳答案
如果没有有关您正在处理的类型的更多详细信息,就很难掌握情况。
我想知道您是否不能只用自己的变压器 Iterator
包装源 Iterator
。
class Transformer[A,B](src :A) extends Iterator[B] {
private var nextB :B = _
def hasNext :Boolean = {
// pull next element from src
// if successful load nextB and return true else return false
}
def next() :B = nextB
}
然后,您可以简单地让 toStream
创建一个 Stream[B]
,该流将在某个时刻终止。
sendToConsumer((new Transformer(source)).toStream)
关于从内部使迭代器映射超时的 Scala 惯用方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50843268/