我们正在尝试使用 Akka Streams 和 Alpakka Kafka 来使用服务中的事件流。为了处理事件处理错误,我们使用了 Kafka 自动提交和多个队列。例如,如果我们有主题 user_created
,我们想从产品服务中消费,我们还创建了 user_created_for_products_failed
和 user_created_for_products_dead_letter
.这两个额外的主题与特定的 Kafka 消费者群体相关联。如果一个事件处理失败,它会进入失败的队列,我们在五分钟后尝试再次消费——如果它再次失败,它就会进入死信。
在部署时,我们希望确保不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但尚未处理所有这些“飞行”事件。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。
阅读文档后,我们看到了 KillSwitch
特征。我们在其中看到的问题是 shutdown
方法返回 Unit
相反 Future[Unit]
正如我们所料。我们不确定使用它会不会丢失事件,因为在测试中它看起来太快而无法正常工作。
作为一种解决方法,我们创建了一个 ActorSystem
对于每个流并使用 terminate
方法(返回 Future[Terminate]
)。这个解决方案的问题是我们不认为创建 ActorSystem
每个流都会很好地扩展,并且 terminate
需要很多时间来解决(在测试中最多需要一分钟才能关闭)。
你遇到过这样的问题吗?是否有更快的方法(与 ActorSystem.terminate
相比)来停止流并确保所有事件发生在 Source
已发出已处理?
最佳答案
来自 documentation (强调我的):
When using external offset storage, a call to
Consumer.Control.shutdown()
suffices to complete theSource
, which starts the completion of the stream.
val (consumerControl, streamComplete) =
Consumer
.plainSource(consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, 0) -> offset
))
.via(businessFlow)
.toMat(Sink.ignore)(Keep.both)
.run()
consumerControl.shutdown()
Consumer.control.shutdown()
返回 Future[Done]
.从它的 Scaladoc 描述:Shutdown the consumer
Source
. It will wait for outstanding offset commit requests to finish before shutting down.
或者,如果您使用的是 Kafka 中的偏移存储 , 使用
Consumer.Control.drainAndShutdown
,它也返回一个 Future
.再次来自文档(其中包含有关 drainAndShutdown
在幕后做了什么的更多信息):val drainingControl =
Consumer
.committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record).map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
val streamComplete = drainingControl.drainAndShutdown()
drainAndShutdown
的 Scaladoc 描述:Stop producing messages from the
Source
, wait for stream completion and shut down the consumerSource
so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.
关于scala - 以编程方式停止 Alpakka Kafka 流的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55046169/