以下是我的用例
这让我探索 Akka 集群的工作分配、路由和扩展。我可以使用 Akka“Supervisor”作为 Kafka 消费者,并根据其分类将传入的工作分配给适当的工作人员。
但是我仍然试图理解的是在 Akka 集群中实现主管和工作人员之间的弹性通信方式的正确方法。因为一旦主管消费了来自 Kafka 的消息,就会提交 Kafka 偏移量。如果在偏移量提交后的处理过程中发生了一些错误,那么以下可以接受的方式来恢复并从上次离开的地方开始吗?
通过使用由 Kafka 支持的持久邮箱使主管成为持久参与者。 Supervisor 将工作在 Kafka 中排队,而 worker 从 Kafka 获取其工作并仅在完成工作后提交其偏移量。
最佳答案
正如 Jaakko 所说,这实际上取决于您使用的第三方库。
就我而言,我已成功使用 Akka Streams Kafka虽然我确实启用了偏移自动提交。
但是,这个库可能会满足您的需求,因为它允许您自定义偏移提交(请参阅部分 External Offset Storage 和 Offset Storage in Kafka )。
文档说:
The Consumer.committableSource makes it possible to commit offset positions to Kafka. Compared to auto-commit this gives exact control of when a message is considered consumed.
为了禁用自动提交,你必须完成你的 Akka
application.conf
通过添加 akka.kafka.consumer
文件部分:akka.kafka.consumer {
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# Disable auto-commit by default
enable.auto.commit = false
}
}
最新版本
akka-stream-kafka_2.11
(版本 0.16
)兼容 Akka 2.5.x
但是您必须使用 Akka 工具包之一覆盖 akka-stream_2.11 依赖项。目前,我正在将这个库与 Akka 一起使用 2.5.3
而且效果很好。希望你能找到你要找的东西:)
关于akka - Kafka 和 Akka 集群,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36536654/