java - Apache Camel - Kafka 组件 - 单生产者多消费者

标签 java apache-kafka apache-camel kafka-consumer-api kafka-producer-api

我正在创建两个apache Camel(蓝图XML)kafka项目,一个是kafka-生产者,它接受请求并将其存储在kafka服务器中,另一个是kafka-consumer,它从kafka服务器获取消息并处理它们。

此设置对于单个主题和单个消费者运行良好。但是,如何在同一个 kafka 主题中创建单独的消费者组?如何在不同消费者组内的同一主题内路由多个消费者特定消息?任何帮助表示赞赏。谢谢。

最佳答案

您的问题很笼统,因为不太清楚您要解决的问题是什么,因此很难理解是否有更好的方法来实现该解决方案。

无论如何,我们首先要说的是,据我所知,您正在寻找选择性消费者(EIP),这是 Kafka 和消费者 API 不支持的开箱即用的东西。选择性消费者可以根据生产者预先设置的特定选择器值,选择从队列或主题中选取哪些消息。这个功能也必须在消息代理中实现,但是kafka没有这样的能力。

Kafka 确实实现了纯 pub/sub 和队列之间的混合解决方案。话虽这么说,您可以做的是通过一个或多个消费者组订阅该主题(稍后会详细介绍),并通过检查消息本身来过滤掉您不感兴趣的所有消息。在消息传递和 EIP 领域,这种模式称为过滤器数组。正如您可以想象的那样,这会在消息广播给所有订阅者之后发生;因此,如果该解决方案不适合您的要求或上下文,那么您可以考虑实现一个基于内容的路由器,该路由器旨在仅在您的集中控制下将消息分派(dispatch)给消费者子集(这意味着中间消费者特定的 channel 可以当然是其他 Kafka 主题或 seda/VM 队列)。

转向第二个问题,这是 Kafka 组件的官方网站:https://camel.apache.org/components/latest/kafka-component.html 。 为了创建不同的消费者组,您只需定义多个路由,每个路由都有一个专用的 groupId。通过添加 groupdId 属性,您将通知消费者组协调员(驻留在 Kafka 代理中)有关多个独立消费者组的存在,并且代理将使用这些组来区分和单独对待它们(通过向他们发送每个消费者组的副本)存储在主题中的日志消息)...

这是一个例子:

public void configure() throws Exception {
    from("kafka:myTopic?brokers={{kafkaBootstrapServers}}" +
                 "&groupId=myFirstConsumerGroup"
            .log("Message received by myFirstConsumerGroup : ${body}");

    from("kafka:myTopic?brokers={{kafkaBootstrapServers}}" +
                 "&groupId=mySecondConsumerGroup"
            .log("Message received by mySecondConsumerGroup : ${body}");

}

正如你所看到的,我在同一个 RouteBuilder 中创建了两个路由,更不用说在同一个 Java 进程中了。在我能想到的大多数用例中,这是一个非常糟糕的设计决策,因为没有单一的责任,分离的关注点,并且它们无法扩展。但同样,这取决于您的要求/环境。 出于完整性考虑,请考虑查看所有其他 Kafka 组件属性,因为可能还有您感兴趣的许多其他配置,例如每组的消费者线程数。 我试图保持高水平,以便发起讨论......如果您有新的更新,我将编辑我的答案。希望我有所帮助!

关于java - Apache Camel - Kafka 组件 - 单生产者多消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61568806/

相关文章:

java - Android root 访问脚本

Java:在 HTTP get 请求中打印无关行

apache-spark - KStreams + Spark 流 + 机器学习

scala - 创建Kafka流的AbstractMethodError

apache-spark - 如何在使用 Spark Streaming 流式传输 kafka 时对消息进行重复数据删除?

java - 如何避免 Spring Boot 加载 EmbeddedWebApplicationContext?

java - 如何在 OSX 上从 Java 访问网络路径?

java - 我们可以在 apache Camel 中将 jmsxgroupid 与线程一起使用来解决线程 dsl 标记中的排序问题吗

java - @Service 和 @Repository bean 未在 Spring boot 和 Apache Camel 中初始化

java - 如何将一个倒计时器嵌套在另一个倒计时器中?