scala - Scala 和 akka-http 休息服务的断路器

标签 scala akka reactive-programming akka-http

我目前正在使用 Akka-HTTP 实现断路器,如下所示:

 def sendMail(entity: MyEntity): ToResponseMarshallable = {

      Thread.sleep(5 * 1000)
      validateEntity(entity).map[ToResponseMarshallable] {
        case (body, subject) if !isEmpty(body, subject) => {
          val mailResponse = sendMail(body, subject)
          OK -> ProcessedEmailMessage(mailResponse)
        }
        case _ =>
          BadRequest -> s"error: for $entity".toJson
      }
    } catch {
      case e: DeserializationException => HttpResponse(BadRequest).withEntity(HttpEntity(s"error:${e.msg}").withContentType(ContentTypes.`application/json`))
    }
  }

  val maxFailures: Int = 2
  val callTimeout: FiniteDuration = 1 second
  val resetTimeout: FiniteDuration = 30 seconds

  def open: Unit = {
    logger.info("Circuit Breaker is open")
  }

  def close: Unit = {
    logger.info("Circuit Breaker is closed")
  }

  def halfopen: Unit = {
    logger.info("Circuit Breaker is half-open, next message goes through")

  private lazy val breaker = CircuitBreaker(
    system.scheduler,
    maxFailures,
    callTimeout,
    resetTimeout
  ).onOpen(open).onClose(close).onHalfOpen(halfopen)

  def routes: Route = {
    logRequestResult("email-service_aggregator_email") {
      pathPrefix("v1") {
        path("sendmail") {
          post {
            entity(as[EmailMessage]) { entity =>
              complete {
                breaker.withCircuitBreaker(Future(sendMail(entity)))
              }
            }
          }
        }
      }
    }
  }

我的问题是,如果我使用 breaker.withCircuitBreaker(Future(sendMail(entity)))断路器进入断开状态,但其余响应返回 There was an internal server error作为回应

如果我使用 breaker.withSyncCircuitBreaker(Future(sendMail(entity)))然后断路器永远不会进入打开状态,但它返回预期的 HttpResponse
关于如何解决此问题以触发断路器并返回正确的 HTTP 响应的任何想法?

最佳答案

entity(as[EmailMessage]) { entity => ctx =>
  val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity)))
  val withErrorHandling = withBreaker.recover {
      case _: CircuitBreakerOpenException => 
        HttpResponse(TooManyRequests).withEntity("Server Busy")
  }
  ctx.complete(withErrorHandling)
}

关于scala - Scala 和 akka-http 休息服务的断路器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35724424/

相关文章:

java - 在 Java/Scala 项目中检查 null 与匹配

scala - 如何在 sbt 中检测 JavaFX 运行时 jar

java - 如何在同一线程中并行通量内运行单声道

scala - Akka向自己发送延迟消息无法找到隐式ExecutionContext

multithreading - 允许actor定期替换共享的不可变只读状态是 “ok”吗?

c# - 使用 ReactiveUI 禁用和启用文本字段更改上的按钮

kotlin - Mono.fromCallable 中的异常不会导致错误

scala - 使用spark-hbase时,出现ClassNotFoundException : org. apache.hadoop.hbase.spark.SparkSQLPushDownFilter

java - 当我运行应用程序时,我添加到 JPanel 的组件不会出现

java - Akka actorSelection(String) 不适用于 TCP