multithreading - 通过 Akka 应用程序中的上下文切换导致 CPU 使用率高

标签 multithreading scala akka cpu-usage context-switch

我正在维护和开发两个 Akka Scala 应用程序,它们与串行设备交互以收集传感器信息。两者之间的主要区别在于,一个(我的 CO2 传感器应用程序)使用 1% 的 CPU,而另一个(我的功率传感器应用程序)使用 250% 的 CPU。 Linux 机器(Raspberry Pi 3)和我的 Windows 台式 PC 上都是这种情况。代码方面的主要区别在于 CO2 直接使用串行库(http://fazecast.github.io/jSerialComm/),而功率传感器应用程序通过中间件层将串行库的输入/输出流转换为 Akka 源/接收器,如下所示:

  val port = SerialPort.getCommPort(comPort)

  port.setBaudRate(baudRate)
  port.setFlowControl(flowControl)
  port.setComPortParameters(baudRate, dataBits, stopBits, parity)
  port.setComPortTimeouts(timeoutMode, timeout, timeout)

  val isOpen = port.openPort()

  if(!isOpen) {
    error(s"Port $comPort could not opened. Use the following documentation for troubleshooting: https://github.com/Fazecast/jSerialComm/wiki/Troubleshooting")

    throw new Exception("Port could not be opened")
  }

  (reactive.streamSource(port.getInputStream), reactive.streamSink(port.getOutputStream))

当我看到如此高的 CPU 使用率时,我立即对它使用了 Profiler (VisualVM),它告诉我以下信息:
Profiler screenshot

在谷歌搜索 Unsafe.park 后,我找到了以下答案:https://stackoverflow.com/a/29414580/1122834 - 使用这些信息,我检查了有和没有我的电源传感器应用程序的上下文切换量,结果非常清楚问题的根本原因:
pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
10  0  32692  80144  71228 264356    0    0     0     5    7    8 38  5 55  2  0
 1  0  32692  80176  71228 264356    0    0     0    76 12932 18856 59  6 35  0  0
 1  0  32692  80208  71228 264356    0    0     0     0 14111 20570 60  8 32  0  0
 1  0  32692  80208  71228 264356    0    0     0     0 13186 16095 65  6 29  0  0
 1  0  32692  80176  71228 264356    0    0     0     0 14008 23449 56  6 38  0  0
 3  0  32692  80208  71228 264356    0    0     0     0 13528 17783 65  6 29  0  0
 1  0  32692  80208  71228 264356    0    0     0    28 12960 16588 63  6 31  0  0

pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0  32692 147320  71228 264332    0    0     0     5    7    8 38  5 55  2  0
 0  0  32692 147296  71228 264332    0    0     0    84  963 1366  0  0 98  2  0
 0  0  32692 147296  71228 264332    0    0     0     0  962 1347  1  0 99  0  0
 0  0  32692 147296  71228 264332    0    0     0     0  947 1318  1  0 99  0  0

如您所见,仅通过终止我的应用程序,上下文切换的数量就每秒减少了约 12000 次。我继续检查哪些确切的线程正在执行此操作,似乎 Akka 真的很渴望做一些事情:
Profiler threads

这里的评论和另一个 SO question 都指向调整 Akka 的并行设置。我将以下内容添加到我的 application.conf - 没有结果。
akka {
  log-config-on-start = "on"
  actor{
    default-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      # What kind of ExecutionService to use
      executor = "fork-join-executor"
      # Configuration for the fork join pool
      default-executor {
        fallback = "fork-join-executor"
      }
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 1
        # Parallelism (threads) ... ceil(available processors * factor)
        parallelism-factor = 1.0
        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 1
      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor.
      # Set to 1 for as fair as possible.
      throughput = 1
    }
  }
  stream{
    default-blocking-io-dispatcher {
      type = PinnedDispatcher
      executor = "fork-join-executor"
      throughput = 1

      thread-pool-executor {
        core-pool-size-min = 1
        core-pool-size-factor = 1.0
        core-pool-size-max = 1
      }
      fork-join-executor {
        parallelism-min = 1
        parallelism-factor = 1.0
        parallelism-max = 1
      }
    }
  }
}

这似乎提高了 CPU 使用率(100% -> 65%),但 CPU 使用率仍然过高。

更新 21-11-'16
看来问题出在我的图表内。当不运行图表时,CPU 使用率立即下降到正常水平。图表如下:
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val responsePacketSource = serialSource
    .via(Framing.delimiter(ByteString(frameDelimiter), maxFrameLength, allowTruncation = true))
    .via(cleanPacket)
    .via(printOutput("Received: ",debug(_)))
    .via(byteStringToResponse)

  val packetSink = pushSource
    .via(throttle(throttle))

  val zipRequestStickResponse = builder.add(Zip[RequestPacket, ResponsePacket])
  val broadcastRequest = builder.add(Broadcast[RequestPacket](2))
  val broadcastResponse = builder.add(Broadcast[ResponsePacket](2))

  packetSink ~> broadcastRequest.in
  broadcastRequest.out(0) ~> makePacket ~> printOutput("Sent: ",debug(_)) ~> serialSink
  broadcastRequest.out(1) ~> zipRequestStickResponse.in0

  responsePacketSource ~> broadcastResponse.in
  broadcastResponse.out(0).filter(isStickAck) ~> zipRequestStickResponse.in1
  broadcastResponse.out(1).filter(!isStickAck(_)).map (al => {
    val e = completeRequest(al)
    debug(s"Sinking:          $e")
    e
  }) ~> Sink.ignore

  zipRequestStickResponse.out.map { case(request, stickResponse) =>
    debug(s"Mapping: request=$request, stickResponse=$stickResponse")
    pendingPackets += stickResponse.sequenceNumber -> request
    request.stickResponse trySuccess stickResponse
  } ~> Sink.ignore

  ClosedShape
})

streamGraph.run()

从 broadcastResponse 中删除过滤器时,CPU 使用率下降到正常水平。这让我相信 zip 永远不会发生,因此,图表进入了不正确的状态。

最佳答案

问题是 Fazecast 的 jSerialComm 库有许多不同的超时模式。

static final public int TIMEOUT_NONBLOCKING = 0x00000000;
static final public int TIMEOUT_READ_SEMI_BLOCKING = 0x00000001;
static final public int TIMEOUT_WRITE_SEMI_BLOCKING = 0x00000010;
static final public int TIMEOUT_READ_BLOCKING = 0x00000100;
static final public int TIMEOUT_WRITE_BLOCKING = 0x00001000;
static final public int TIMEOUT_SCANNER = 0x00010000;

使用非阻塞 read()方法 ( TIMEOUT_NONBLOCKING ) 在与 Akka Stream 的 InputStreamPublisher 结合使用时会导致 CPU 使用率非常高。 .为了防止这种情况,只需使用 TIMEOUT_READ_SEMI_BLOCKINGTIMEOUT_READ_BLOCKING .

关于multithreading - 通过 Akka 应用程序中的上下文切换导致 CPU 使用率高,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40675360/

相关文章:

scala - Akka 流 : dealing with futures within graph stage

java - 如何避免每个线程创建不同的池?

c++ - memcpy 的多线程编程

scala - Spark 1.6 将函数应用于名称中带有点的列/如何正确转义 colName

scala - 匹配 Akka 中的值类

sbt - 用于 Akka/Play 监控的 Typesafe 控制台发生了什么变化?

c++ - 停止所有 C++ 线程

c# - SSIS 跨进程通信有哪些选项?

scala - 有没有办法使用 Gradle 的 Scala 3 编译器(Dotty)?

scala - 为什么 IDEA 14 报告 "Cannot load facet "Scala": Unknown type of facet "scala""?