我有时会发现自己有一些 Stream[X]
, 和 function X => Future Y
,我想合并到 Future[Stream[Y]]
,而且我似乎找不到办法做到这一点。例如,我有
val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)
val result : Future[Stream[String]] = ???
我试过
val result = Future.Traverse(x, toFutureString)
这给出了正确的结果,但似乎在返回 Future 之前消耗了整个流,这或多或少地打败了目的
我试过
val result = x.flatMap(toFutureString)
但这不能与
type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]
一起编译val result = x.map(toFutureString)
返回有点奇怪和无用的
Stream[Future[String]]
我应该怎么做才能解决问题?
编辑:我没有被困在
Stream
上,我对 Iterator
上的相同操作同样感到满意,只要它在开始处理头部之前不会阻止评估所有项目Edit2:我不是 100% 确定 Future.Traverse 构造需要在返回 Future[Stream] 之前遍历整个流,但我认为确实如此。如果没有,这本身就是一个很好的答案。
Edit3:我也不需要结果是有序的,我对流或迭代器返回的任何顺序都很好。
最佳答案
您与 traverse
走在正确的轨道上,但不幸的是,在这种情况下,标准库的定义看起来有点困惑——它不应该在返回之前消耗流。Future.traverse
是一个更通用的函数的特定版本,它适用于任何包裹在“可遍历”类型中的应用仿函数(例如,请参阅 these papers 或我的回答 here 以获取更多信息)。
Scalaz库提供了这个更通用的版本,在这种情况下它按预期工作(请注意,我正在从 Future
获取 scalaz-contrib
的应用仿函数实例;它还没有在 Scalaz 的稳定版本中,它仍然是交叉的针对 Scala 2.9.2 构建,它没有这个 Future
):
import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._
import ExecutionContext.Implicits.global
def toFutureString(value: Int) = Future(value.toString)
val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString
这会立即在无限流上返回,因此我们确定它不会首先被消耗。
作为脚注:如果您查看 the source为
Future.traverse
你会看到它是按照 foldLeft
实现的,这很方便,但在流的情况下不是必需的或不合适的。
关于scala - 使用返回 Future 的函数映射 Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18043749/