我有一个 divertLeft
函数,基于此处的代码:https://bszwej.medium.com/akka-streams-error-handling-7ff9cc01bc12 ,这会将 Lefts 转移到指定的接收器,并将 Rights 转发给下游消费者。
def divertLeft(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
flow.via {
Flow[(Either[L, R], CO)]
.divertTo(
Flow[(Either[L, R], CO)]
.collect { case (Left(element), c) => (element, c) }
.to(to),
_._1.isLeft
)
.collect { case (Right(element), c) => (element, c) }
}
}
上游的实际处理,在某些情况下返回 Left(new Exception(...))
,在其他情况下返回 Left(new Error(...))
,我想以不同的方式处理这些。
这相当不可爱,但我希望它能起作用......
def divertLeftIgnoreError(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
/* As above, but if the Left value is an Error, then ignore it instead of diverting to the given destination */
flow.via {
Flow[(Either[L, R], CO)]
.divertTo(
Flow[(Either[L, R], CO)]
.collect { case (Left(element), c) if element.isInstanceOf[Error] => (element, c) }
.to(Sink.ignore),
_._1.isLeft
)
.divertTo(
Flow[(Either[L, R], CO)]
.collect { case (Left(element), c) if !element.isInstanceOf[Error] => (element, c) }
.to(to),
_._1.isLeft
)
.collect { case (Right(element), c) => (element, c) }
}
}
...事实并非如此。所有的 Left 似乎都被忽略了,可能是因为 .collect
没有做我认为它做的事情,所以消息只是从函数的末尾掉下来而没有被处理。你甚至可以用这种方式编写 divertTo
吗?
我还考虑过为“_._1.isLeft
”编写一个谓词(传递给 divertTo
),并且 Left 的内容也是一个异常”,但我无法弄清楚其语法。
尝试处理不同类型的左派是否从根本上来说是考虑不周的?如果是这样,我应该使用什么模式来处理这个问题?
最佳答案
正如您所想,您的问题出在 divertTo
的谓词函数中。任何与谓词匹配的元素都将被转移。
就您而言,首先 divertTo
转移所有 Left
。然后接收器仅收集错误并将它们发送到忽略接收器。其他 Left
被 collect
过滤掉。
您想要的确实是一个更精确的谓词,就像您在 collect
的 case
中编写的谓词:
item => item._1 match {
case Left(e) if e.isInstanceOf[Error] => true
case _ => false
}
(这只是如何编写它的示例,实际上还有其他方法可以编写它,例如更内联的 _._1.left.exists(_.isInstanceOf[Error])
,选择您喜欢的)。
对每个谓词执行相同的操作,它应该按预期工作。
请注意,如果您要忽略 Left(Error(..)
,您可以先过滤它们并保留一个 divertTo
。
关于scala - Akka 流,条件 divertLeft,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76656138/