scala - 以编程方式停止 Alpakka Kafka 流的正确方法

标签 scala apache-kafka akka akka-stream alpakka

我们正在尝试使用 Akka Streams 和 Alpakka Kafka 来使用服务中的事件流。为了处理事件处理错误,我们使用了 Kafka 自动提交和多个队列。例如,如果我们有主题 user_created ,我们想从产品服务中消费,我们还创建了 user_created_for_products_faileduser_created_for_products_dead_letter .这两个额外的主题与特定的 Kafka 消费者群体相关联。如果一个事件处理失败,它会进入失败的队列,我们​​在五分钟后尝试再次消费——如果它再次失败,它就会进入死信。

在部署时,我们希望确保不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但尚未处理所有这些“飞行”事件。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。

阅读文档后,我们看到了 KillSwitch 特征。我们在其中看到的问题是 shutdown方法返回 Unit相反 Future[Unit]正如我们所料。我们不确定使用它会不会丢失事件,因为在测试中它看起来太快而无法正常工作。

作为一种解决方法,我们创建了一个 ActorSystem对于每个流并使用 terminate方法(返回 Future[Terminate] )。这个解决方案的问题是我们不认为创建 ActorSystem每个流都会很好地扩展,并且 terminate需要很多时间来解决(在测试中最多需要一分钟才能关闭)。

你遇到过这样的问题吗?是否有更快的方法(与 ActorSystem.terminate 相比)来停止流并确保所有事件发生在 Source已发出已处理?

最佳答案

来自 documentation (强调我的):

When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.


val (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
                 Subscriptions.assignmentWithOffset(
                   new TopicPartition(topic, 0) -> offset
                 ))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()
Consumer.control.shutdown()返回 Future[Done] .从它的 Scaladoc 描述:

Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.



或者,如果您使用的是 Kafka 中的偏移存储 , 使用 Consumer.Control.drainAndShutdown ,它也返回一个 Future .再次来自文档(其中包含有关 drainAndShutdown 在幕后做了什么的更多信息):
val drainingControl =
  Consumer
    .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record).map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()
drainAndShutdown 的 Scaladoc 描述:

Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.

关于scala - 以编程方式停止 Alpakka Kafka 流的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55046169/

相关文章:

java - 如果我尝试将值放入 View 类中的 map 中,为什么会出现错误?

scala - 使用spark将数据写入cassandra

NAT 后面的 akka 集群节点(使用 docker)

java - 如何从 MultilayerPerceptronClassifier 获取分类概率?

Scala 流 : how to avoid to keeping a reference to the head (and other elements)

scala - 可以对 scala dsl 进行编码以发出其自己的专门编译错误吗?

docker - 为什么kafka docker需要监听unix套接字

elasticsearch - 如何激活和配置 ElasticSearch Kafka Connect 接收器?

scala - 使用 Akka 构建工作流引擎

java - Akka如何连接源、流和汇