java - Apache Kafka - 是否有可能因错误而丢失消息?

标签 java apache-kafka spring-cloud-stream

我正在深入研究 Apache KafkaSpring Cloud Stream并观察到一些行为让我想知道我是否做错了什么或者它是否按预期工作 - 我毫不怀疑:

有可能在出错时丢失消息!?

我的设置尽可能简单。单个 Kafka 代理和一个只有 1 个分区的主题。 Broker、topic、producer 和 consumer 具有默认设置(auto-ack 为真)。

测试用例 1

  • 产生message1
  • 产生message2
  • 启动一个消费者,它将在收到任何消息时抛出 RuntimeException
  • 消费message1,重试
  • 消费message1,重试
  • 消费message1,重试
  • 抛出异常
  • 消费message2,重试
  • 消费message2,重试
  • 消费message2,重试
  • 抛出异常
  • 停止并重启消费者
  • 消费message1,重试
  • 消费message1,重试
  • 消费message1,重试
  • 抛出异常
  • 消费message2,重试
  • 消费message2,重试
  • 消费message2,重试
  • 抛出异常

按预期工作。

测试用例 2

  • 产生message1
  • 产生message2
  • 启动一个消费者,它会在收到完全message1
  • 时抛出 RuntimeException
  • 消费message1,重试
  • 消费message1,重试
  • 消费message1,重试
  • 抛出异常
  • 成功消费message2
  • 产生message3
  • 成功消费message3
  • 停止并重启消费者
  • 没有任何反应,消费者等待新消息被消费

message1 将被跳过,因为提交的偏移量已设置为 message3。这就是困扰我的地方。只要先前的消息未成功处理,我不希望消费者继续处理消息。

有没有人经历过相同的行为和/或可以指导我如何改变这种行为?

提前致谢!


更新:根据要求,一些代码片段

创建主题

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

连接生产者

kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

创建一个maven项目

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.7.RELEASE</version>
    <relativePath/>
</parent>

...

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>


<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

添加以下application.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test-topic
          contentType: text/plain
          group: test-group
          consumer:
            header-mode: raw
      kafka:
        binder:
          zkNodes: localhost:2181
          brokers: localhost:9092

添加以下Application.java

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    private void consume(Message<String> message) {
        log.info("Received: {}", message.getPayload());
        if ("message1".equals(message.getPayload())
            throw new RuntimeException();
        log.info("Successfully processed message {}", message.getPayload());
    }
}

应该是这样的。运行应用程序并使用控制台生产者生成消息。

最佳答案

在 Kafka 中,每条消息都带有一个偏移量 ID。您的消费者应用程序可以检查偏移量,如果有任何偏移量被跳过或丢失,而不是使用下一条消息。您可以使用 consumer.seek 方法获取丢失的特定消息。

抵消在本质上是递增的和连续的。

在你的情况下使用手动提交。

我可以说使用以下步骤..

  1. 在 poll 方法之后,首先检查之前提交的偏移量和 并请求下一个偏移值

  2. 一旦消息被成功消费和处理,保存 某些内部成功处理消息的偏移值 内存或表。在下一次投票中

下面的链接不会为您的用例服务,但您可以获得公平的想法

引用Example

关于java - Apache Kafka - 是否有可能因错误而丢失消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46705471/

相关文章:

apache-kafka - 仅启用一次时,Kafka 流中的 UnknownProducerIdException

java - android播放声音效果。

java - 如何通过互联网在两台计算机之间发送数据

java - 我不知道为什么 ArrayIndexOutOfBoundsException 出现在单独链接中

docker - 为什么我应该将docker image “confluentinc/kafka”用于kafka集群?

java - Kafka Streams API 中的 ArrayList Serde 问题

Spring Cloud 流 MessageChannel send() 总是返回 true

java - 处理死信队列消息代理独立的方式

spring-boot - 如何在@StreamListener 中添加日期条件

java - 回复 : Convert Double to byte[] array