scala - 与 Akka Streams 同步反馈

标签 scala akka-stream reactive-streams backpressure

我想要实现的目标是使用 akka 流实现同步反馈循环之类的东西。

假设您有一个 Flow[Int].filter(_ % 5 == 0)。当您将 Int 流广播到此流并将元组直接压缩到它后面时,您会得到类似于

(0,0)
(5,1)
(10,2)

有没有办法发出一个 Option[Int],它指示流是否在我推送下一个元素后发出一个元素?

(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...

我考虑过在 Flow 的前后实现我自己的 DetachedStage 来保持状态,每当流在舞台上拉动之前,我知道他需要下一个元素。当后面的stage没有收到元素时,就是None。

不幸的是,结果并不好,很多位置都偏离了。

边注

过滤器 Flow 只是一个例子,它可能是一个很长的流程,我无法提供在其中的每个阶段发出 Option 的能力,所以我真的必须知道, 流是否推送了下一个或者没有从下游请求下一个

我也尝试过 conflateexpand,但是这些我们在结果的位置偏移方面更糟

我在配置中更改的一件事是流的 initialmax 缓冲区,因此我可以确定指示的需求确实是在我推送的元素之后通过它。

如果能得到一些关于如何解决这个问题的建议,那就太好了!

最佳答案

我无法生成您正在寻找的内容。但我可以骗取你正在寻找的 future ,例如:

(Future(Some(0)), 0)
(Future(None)   , 1)
(Future(None)   , 2)
...

扩展您的示例,如果给定一个无法更改的流程:

val flow = Flow[Int].filter(_ % 5 == 0)

然后可以在单个输入上评估此流程,并将结果转换为 Option:

import scala.concurrent.{Future, Promise}
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.{Source,Sink}

def evalFlow(in : Int, flow : Flow[Int, Int, _])(implicit mat : Materializer, ec : ExecutionContext) = {
  val fut : Future[Int] = 
    Source.single(in)
          .via(flow)
          .runWith(Sink.head) //Throws an Exception if filter fails

  fut.map(Some(_))              //       val => Some(val)
     .fallbackTo(Promise(None)) // Exception => None
} 

此函数返回一个 Future[Option[Int]]。然后我们可以使用评估来简单地将结果与输入结合起来:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  (evalFlow(in, flow), in)//(Future[Option[Int]], Int)

最后,evalAndCombine 函数可以放在您的整数源之后:

import akka.actor.ActorSystem

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

val exampleSource = Source(() => (1 to 6).toIterator)

val tupleSource = exampleSource map evalAndCombine(flow)

同样,如果您想要 Future[(Option[Int], Int)] 而不是 (Future[Option[Int]], Int),例如:

Future[(Some(0), 0)]
Future[(None   , 1)]
...

然后稍微修改combine函数:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  evalFlow(in, flow) map (option => (option, in))//Future[(Option[Int], Int)]

关于scala - 与 Akka Streams 同步反馈,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31865789/

相关文章:

scala - Akka Streams TCP 套接字客户端终止

java - MongoDB Reactive Streams 在执行查询时挂起

json - 阿卡 HTTP : How to unmarshal Json format response into domain objects

scala - 为未使用的变量使用占位符时出现 MatchError

scala - 我可以参数化 Slick 查询宏吗?

scala - 这个 Scala 语法是什么意思——值定义后跟左大括号和缩进的东西?

scala - Scala 如何知道调用什么方法(命名参数)

scala - 数据流的 SHA256

project-reactor - 用于阻塞 I/O 任务的 ParallelFlux 与 flatMap()

scala - Akka Streams 自定义合并