从内部使迭代器映射超时的 Scala 惯用方法?

标签 scala iterator future

我有一个生产者和消费者架构,其中我的生产者返回一个迭代器,而我的消费者期望一些转换后的结果。两者都超出了我的控制范围。现在我的代码负责转换源流。问题之一是源吞吐量不可靠。它将以不同的速率产生记录。有时太慢。

是否可以在映射阶段终止流?我确实有一个可以设置来终止该进程的标志。顺便说一句,我无法将 Futures 和超时放置在消费者之外。

我尝试过的事情:

在 map 内击杀。如果一段时间内没有生成记录,则永远不会触发此条件,这会带来缺点。

source.map(x=> {if(System.currentTimeMillis()>limit) kill(); x})

另一个选择是使用一段时间。但是,它不能从 while 中产生。

while(source.hasNext()){
    Try(Await.result(Future{source.next()}, limit))
    match {
        case _@Failure(e)=> kill()
        case bla..
    }
}

有什么创新的想法吗?

最佳答案

如果没有有关您正在处理的类型的更多详细信息,就很难掌握情况。

我想知道您是否不能只用自己的变压器 Iterator 包装源 Iterator

class Transformer[A,B](src :A) extends Iterator[B] {
  private var nextB :B = _
  def hasNext :Boolean = {
    // pull next element from src
    // if successful load nextB and return true else return false
  }

  def next() :B = nextB
}

然后,您可以简单地让 toStream 创建一个 Stream[B],该流将在某个时刻终止。

sendToConsumer((new Transformer(source)).toStream)

关于从内部使迭代器映射超时的 Scala 惯用方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50843268/

相关文章:

forms - 如何在Play中处理长格式和不同格式! Scala表格?

java - 从接口(interface)转换为特征

c++ - 如何将 vector 拆分为 n 个 "almost equal"部分

c++ - std::invoke_result_t 编译时语法错误

使用 IO::Async 和 Future::Utils 并发请求的 Perl 问题

scala - 覆盖 Predef 的隐式转换

scala - 如何列出 scala 子目录中的所有文件?

java - Eclipse 错误无法解析为类型

Java - 一旦发生 2 个 Callables 中的任何一个错误,就捕获错误

java - 为什么这个 Future 的方法阻塞了主线程?