scala - 使用返回 Future 的函数映射 Stream

标签 scala stream future

我有时会发现自己有一些 Stream[X] , 和 function X => Future Y ,我想合并到 Future[Stream[Y]] ,而且我似乎找不到办法做到这一点。例如,我有

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)

val result : Future[Stream[String]] = ???

我试过
 val result = Future.Traverse(x, toFutureString)

这给出了正确的结果,但似乎在返回 Future 之前消耗了整个流,这或多或少地打败了目的

我试过
val result = x.flatMap(toFutureString)

但这不能与 type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?] 一起编译
val result = x.map(toFutureString)

返回有点奇怪和无用的 Stream[Future[String]]
我应该怎么做才能解决问题?

编辑:我没有被困在 Stream 上,我对 Iterator 上的相同操作同样感到满意,只要它在开始处理头部之前不会阻止评估所有项目

Edit2:我不是 100% 确定 Future.Traverse 构造需要在返回 Future[Stream] 之前遍历整个流,但我认为确实如此。如果没有,这本身就是一个很好的答案。

Edit3:我也不需要结果是有序的,我对流或迭代器返回的任何顺序都很好。

最佳答案

您与 traverse 走在正确的轨道上,但不幸的是,在这种情况下,标准库的定义看起来有点困惑——它不应该在返回之前消耗流。
Future.traverse是一个更通用的函数的特定版本,它适用于任何包裹在“可遍历”类型中的应用仿函数(例如,请参阅 these papers 或我的回答 here 以获取更多信息)。

Scalaz库提供了这个更通用的版本,在这种情况下它按预期工作(请注意,我正在从 Future 获取 scalaz-contrib 的应用仿函数实例;它还没有在 Scalaz 的稳定版本中,它仍然是交叉的针对 Scala 2.9.2 构建,它没有这个 Future ):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._

import ExecutionContext.Implicits.global

def toFutureString(value: Int) = Future(value.toString)

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

这会立即在无限流上返回,因此我们确定它不会首先被消耗。

作为脚注:如果您查看 the sourceFuture.traverse你会看到它是按照 foldLeft 实现的,这很方便,但在流的情况下不是必需的或不合适的。

关于scala - 使用返回 Future 的函数映射 Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18043749/

相关文章:

java - 如何使用 ScalaCheck 测试 Java 程序?

scala - 如果 SparkSession 没有关闭会发生什么?

Scala 主要参与者不匹配 react block 中的案例

python - 将 json 对象插入数据湖

java - 如何中断 Future,但仍然等待它完成?

c++ - 如何从 C++11 中的 std::future move 结果?

arrays - 如何在Scala中高效地将阵列复制到另一个阵列?

java - 如何通过 Akka Streams for java 创建和使用 Pair 流?

c++ - 嵌套模板 C++ STL 的重载流运算符

rust - 没有为实现 `poll` 的类型找到名为 `Future` 的方法