scala - 如何以惯用的方式向 Akka 流添加错误日志记录?

标签 scala logging akka akka-stream

我目前正在运行类似于以下内容的 Akka 流设置:

                 ┌───────────────┐                 
┌─────────────┐  │┌─────────────┐│                 
│REST endpoint│──▶│Queue source ││                 
└─────────────┘  │└──────╷──────┘│                 
                 │┌──────▼──────┐│                 
                 ││   Flow[T]   ││                 
                 │└──────╷──────┘│                 
                 │┌──────▼──────┐│  ┌─────────────┐
                 ││  KafkaSink  │├─▶│ Kafka topic │
                 │└─────────────┘│  └─────────────┘
                 └───────────────┘                 

虽然这项工作很好,但我想对生产系统有一些了解,即是否存在错误以及什么样的错误。例如,我已经包装了 KafkaSinkRestartSink.withBackoff并将以下属性应用于包装的接收器:

private val decider: Supervision.Decider = {
  case x =>
    log.error("KafkaSink encountered an error and will stop", x)
    Supervision.Stop
}

Flow[...]
  .log("KafkaSink")
  .to(Producer.plainSink(...))
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .addAttributes(
    ActorAttributes.logLevels(
      onElement = Logging.DebugLevel,
      onFinish = Logging.WarningLevel,
      onFailure = Logging.ErrorLevel
    )
  )

这确实为我提供了一些见解,例如我将收到一条日志消息,指出下游已关闭,以及通过 supervisionStrategy 发生的异常我添加的。

然而,这个解决方案感觉有点像一种变通方法(例如,将异常记录在监督策略中),并且它也没有提供对 RestartWithBackoffSink 行为的任何洞察。 .我当然可以启用 DEBUG该类的级别日志记录,但同样,这感觉像是在生产中做的一种解决方法。

长话短说:
  • 我试图深入了解我的 Akka 流中发生的错误的方式是否有任何明显的缺点
  • 是否有更好/更惯用的方法将日志记录添加到生产中的 Akka 流
  • 最佳答案

    我想你快到了!!

    其实就是documentation中描述的方式.使用 log() approach为流经流的元素、流的完成和失败提供更细粒度的日志记录级别控制。虽然,我不喜欢在主管策略中添加日志消息。如果您确实想显示该特定异常,则创建一个自定义异常,在主管策略中捕获它并让 Akka 为您记录该消息。您可以启用 debug-logging在 Akka 流中 config默认情况下是 off用于调试日志级别的其他故障排除日志记录。除此之外,您还可以在参与者级别启用日志记录。(请参阅此 documentation)。

    我认为在生产中,可能有两种方法来记录错误:

    1) 在恢复阶段记录或重新抛出异常。这样所有来自上游的异常都将被捕获并记录:

    object AkkaStreamRecap extends App {
    
      implicit val system = ActorSystem("AkkaStreamsRecap")
      implicit val materialiser = ActorMaterializer()
      import system.dispatcher
    
      val source = Source(-5 to 5) 
      val sink = Sink.foreach[Int](println)
      val flow = Flow[Int].map(x => 1 / x)
    
      val runnableGraph = source.
        via(flow).
        recover {
          case e => throw e
        }.
        to(sink)
    
      runnableGraph.run()
    }
    

    输出:
    0
    0
    0
    0
    -1
    [ERROR] [03/06/2020 16:27:58.703] [AkkaStreamsRecap-akka.actor.default-dispatcher-2] [akka://AkkaStreamsRecap/system/StreamSupervisor-0/flow-0-0-ignoreSink] Error in stage [Recover(<function1>)]: / by zero
    java.lang.ArithmeticException: / by zero
        at com.personal.akka.http.chapter1.AkkaStreamRecap$.$anonfun$flow$1(AkkaStreamRecap.scala:41)
        at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
        at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
        at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
        at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:480)
        at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
        at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
        at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:576)
        at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:682)
        at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:731)
        at akka.actor.Actor.aroundPreStart(Actor.scala:550)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:671)
        at akka.actor.ActorCell.create(ActorCell.scala:676)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
        at akka.dispatch.Mailbox.run(Mailbox.scala:228)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    

    2) 定义自定义监督策略并在流属性或物化器设置中使用它:
    object AkkaStreamRecap extends App {
    
      implicit val system = ActorSystem("AkkaStreamsRecap")
    
      private val decider: Supervision.Decider = {
        case e: ArithmeticException =>
          println("Arithmetic exception: Divide by Zero")
          Supervision.Stop
      }
    
      implicit val materialiser = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
    
      import system.dispatcher
    
    
      val source = Source(-5 to 5)
      val sink = Sink.foreach[Int](println)
      val flow = Flow[Int].map(x => 1 / x)
    
      val runnableGraph = source.via(flow).log("divide by zero").to(sink)
    
      runnableGraph.run()
    }
    

    输出:
    0
    0
    0
    0
    -1
    Arithmetic exception: Divide by Zero
    [ERROR] [03/06/2020 16:37:00.740] [AkkaStreamsRecap-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://AkkaStreamsRecap/system/StreamSupervisor-0)] [divide by zero] Upstream failed.
    java.lang.ArithmeticException: / by zero
        at com.personal.akka.http.chapter1.AkkaStreamRecap$.$anonfun$flow$1(AkkaStreamRecap.scala:26)
        at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
        at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
        at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
        at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:480)
        at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
        at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
        at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:576)
        at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:682)
        at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:731)
        at akka.actor.Actor.aroundPreStart(Actor.scala:550)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:671)
        at akka.actor.ActorCell.create(ActorCell.scala:676)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
        at akka.dispatch.Mailbox.run(Mailbox.scala:228)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    

    让我知道它是否有帮助!!

    P.S...我在官方文档中找不到任何关于记录错误的其他方法的来源或方法。

    关于scala - 如何以惯用的方式向 Akka 流添加错误日志记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60549592/

    相关文章:

    scala - 将 VectorAssembler 添加到 Spark ML Pipeline 时出错

    python - python logging.StreamHandler() 的包装输出

    java - 使用 Akka Actor 的文件操作

    java - Akka 心跳延迟

    scala - SBT 0.10 - 移除对 Scala 2.8.1 的依赖

    scala - 安装 Typesafe Stack 2.0 时 `g8` 做什么

    使用日志记录时 Python print()/sys.stdout.write() 不可见

    PHP Monolog 记录器 RotatingFileHandler 从不旋转文件

    scala - Actor 模型不是一种反模式吗,因为“一劳永逸”的风格迫使 Actor 记住一种状态?

    scala - `synchronized` 使用方式不同