scala - Akka 流,条件 divertLeft

标签 scala akka akka-stream

我有一个 divertLeft 函数,基于此处的代码:https://bszwej.medium.com/akka-streams-error-handling-7ff9cc01bc12 ,这会将 Lefts 转移到指定的接收器,并将 Rights 转发给下游消费者。

def divertLeft(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
  flow.via {
    Flow[(Either[L, R], CO)]
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) => (element, c) }
          .to(to),
        _._1.isLeft
      )
      .collect { case (Right(element), c) => (element, c) }
  }
}

上游的实际处理,在某些情况下返回 Left(new Exception(...)),在其他情况下返回 Left(new Error(...)) ,我想以不同的方式处理这些。

这相当不可爱,但我希望它能起作用......

def divertLeftIgnoreError(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
  /* As above, but if the Left value is an Error, then ignore it instead of diverting to the given destination */
  flow.via {
    Flow[(Either[L, R], CO)]
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) if element.isInstanceOf[Error] =>  (element, c) }
          .to(Sink.ignore),
        _._1.isLeft
      )
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) if !element.isInstanceOf[Error] => (element, c) }
          .to(to),
        _._1.isLeft
      )
      .collect { case (Right(element), c) => (element, c) }
  }
}

...事实并非如此。所有的 Left 似乎都被忽略了,可能是因为 .collect 没有做我认为它做的事情,所以消息只是从函数的末尾掉下来而没有被处理。你甚至可以用这种方式编写 divertTo 吗?

我还考虑过为“_._1.isLeft”编写一个谓词(传递给 divertTo),并且 Left 的内容也是一个异常”,但我无法弄清楚其语法。

尝试处理不同类型的左派是否从根本上来说是考虑不周的?如果是这样,我应该使用什么模式来处理这个问题?

最佳答案

正如您所想,您的问题出在 divertTo 的谓词函数中。任何与谓词匹配的元素都将被转移。

就您而言,首先 divertTo 转移所有 Left。然后接收器仅收集错误并将它们发送到忽略接收器。其他 Leftcollect 过滤掉。

您想要的确实是一个更精确的谓词,就像您在 collectcase 中编写的谓词:

item => item._1 match {
  case Left(e) if e.isInstanceOf[Error] => true
  case _ => false
}

(这只是如何编写它的示例,实际上还有其他方法可以编写它,例如更内联的 _._1.left.exists(_.isInstanceOf[Error]),选择您喜欢的)。

对每个谓词执行相同的操作,它应该按预期工作。

请注意,如果您要忽略 Left(Error(..),您可以先过滤它们并保留一个 divertTo

关于scala - Akka 流,条件 divertLeft,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76656138/

相关文章:

scala - akka-http 中的连接池使用源队列实现线程安全吗?

akka - 在 Slick 事务中处理 Akka 流

scala - 如果测试失败,如何配置 ScalaTest 以中止套件?

scala - 如何在 Scala 3 宏中创建泛型类型的实例?

Scala - 集合比较 - 为什么 Set(1) == ListSet(1)?

Java/Scala 反射 : Get class methods in order and force object init

具有远程节点 : Disassociated exception 的 Akka (.net) 集群

scala - akka typed 中的询问模式示例

scala - Akka 远程处理和 Heroku

scala - Akka Streams 中的 Monadic 短路