scala - 在 Akka Stream - Scala 中抛出异常的最佳实践是什么?

标签 scala akka akka-stream

我是 Akka Stream 的新手,正在尝试找出处理流内意外行为的最佳实践。

我正在构建一个流,它使用 Account Kit API 将短期代码交换为长期访问 token 。

这里是构建流的相关代码:

override def ExchangeCode: Flow[String, AccessTokenInfo, NotUsed] =
Flow[String].mapAsync(1) { code =>
  ws.url(buildUrl("access_token"))
    .withQueryString(
      "grant_type" -> "authorization_code",
      "code" -> code,
      "access_token" -> appToken
    )
    .withRequestTimeout(timeout)
    .get
    .map { response =>
      if (response.status != 200) throw new RuntimeException("Unexpected response")
      else response.json
    }
    .map { json =>
      AccessTokenInfo("123456", 123, "123456")
    }
}

我想知道如果状态代码不是 200 则抛出异常是否是处理它的正确方法,但据我所知,这是提前终止流的唯一方法。 (目前返回值为虚拟值)

最佳答案

如果状态不是 200,为什么流终止是绝对必要的?通常,您甚至希望将故障发送到下游,以便 Flow 的任何用户都能收到通知并采取相应行动。

处理此类故障的典型方法是使用 Try 。稍微修改您的流程:

//                                  Add Try Output
//                                       |
//                                       v
override def ExchangeCode: Flow[String, Try[AccessTokenInfo], _] = 
  Flow[String] 
    ...
    .map { response => response.status match {
        case 200 => Try { response.json }
        case _   => Try { throw new RuntimeException("Unexpected Response") }
      }
    }
    .map( _ map (json => AccessTokenInfo("123456", 123, "123456")))

现在,如果可以检索或获取异常并能够处理失败情况,则流的任何用户都可以获得有效的访问 token 。

替代返回类型:选项

如果只有 1 个异常只能由 1 个原因产生,那么 Option 也是一个可行的返回类型:

override def ExchangeCode: Flow[String, Option[AccessTokenInfo], _] = 
  ...
        case 200 => Some(response.json)
        case _   => None
  ...

关于scala - 在 Akka Stream - Scala 中抛出异常的最佳实践是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41542808/

相关文章:

scala - 任意类型A的函数组合

scala - Scala 中的 Akka,感叹号和问号

scala - Akka http 丢失发件人引用

scala - 在 Akka-Http 中完成 Source[ByteString, _]

scala - Scala 中带有隐式参数的函数类型

Scala全部功能作为部分功能

java - 使用Java将Mat复制到OpenCV中的原始数组? (出现 "multiple of channels count"错误)

scala - 将并行集合与Akka混合

scala - 使用 TCP 流并将其重定向到另一个 Sink(使用 Akka Streams)

akka-stream - akka http 客户端 system.shutdown() 使用 https 时产生 "Outgoing request stream error (akka.stream.AbruptTerminationException)"