akka - Kafka 和 Akka 集群

标签 akka apache-kafka akka-cluster

以下是我的用例

  • 一堆应用程序将消息放入 Kafka 中不同主题的队列。
  • 让每个主题的使用者将工作分发给集群中的工作人员。工作可以分类为长时间运行、内存密集型、简单等,并相应地选择工作人员。

  • 这让我探索 Akka 集群的工作分配、路由和扩展。我可以使用 Akka“Supervisor”作为 Kafka 消费者,并根据其分类将传入的工作分配给适当的工作人员。

    但是我仍然试图理解的是在 Akka 集群中实现主管和工作人员之间的弹性通信方式的正确方法。因为一旦主管消费了来自 Kafka 的消息,就会提交 Kafka 偏移量。如果在偏移量提交后的处理过程中发生了一些错误,那么以下可以接受的方式来恢复并从上次离开的地方开始吗?

    通过使用由 Kafka 支持的持久邮箱使主管成为持久参与者。 Supervisor 将工作在 Kafka 中排队,而 worker 从 Kafka 获取其工作并仅在完成工作后提交其偏移量。

    最佳答案

    正如 Jaakko 所说,这实际上取决于您使用的第三方库。

    就我而言,我已成功使用 Akka Streams Kafka虽然我确实启用了偏移自动提交。

    但是,这个库可能会满足您的需求,因为它允许您自定义偏移提交(请参阅部分 External Offset StorageOffset Storage in Kafka )。

    文档说:

    The Consumer.committableSource makes it possible to commit offset positions to Kafka. Compared to auto-commit this gives exact control of when a message is considered consumed.



    为了禁用自动提交,你必须完成你的 Akka application.conf通过添加 akka.kafka.consumer 文件部分:
    akka.kafka.consumer {
    
      # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
      # can be defined in this configuration section.
    
      kafka-clients {
        # Disable auto-commit by default
        enable.auto.commit = false
      }
    
    }
    

    最新版本 akka-stream-kafka_2.11 (版本 0.16)兼容 Akka 2.5.x但是您必须使用 Akka 工具包之一覆盖 akka-stream_2.11 依赖项。目前,我正在将这个库与 Akka 一起使用 2.5.3而且效果很好。

    希望你能找到你要找的东西:)

    关于akka - Kafka 和 Akka 集群,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36536654/

    相关文章:

    java - 如何在 Java 应用程序中使用 Akka Actors?

    cassandra - 卡夫卡连接 : Error in sink cassandra connector

    akka - 测试参与者发送者是本地还是远程

    singleton - 存在单例时如何在 akka 集群中配置停机

    java - Akka 集群、集群分片和集群单例用例

    scala - 使用 Play 2.6 和 akka 流的 Websocket 代理

    java - Akka 心跳延迟

    scala - 使用 akka 路由 dsl 获取 http header

    apache-kafka - Debezium-不包含连接器类型

    java - Maven 冲突依赖 kafka-stream-test-utils 和 kafka-streams