java - Apache Kafka 消息消费确认

标签 java apache-kafka

我已经实现了一个 Java 使用者,它使用来自 Kafka 主题的消息,然后将这些消息与 POST 请求一起发送到 REST API。

    while (true) {
        ConsumerRecords<String, Object> records = consumer.poll(200);
        for (ConsumerRecord<String, Object> record : records) {
            CloseableHttpClient httpClient = HttpClientBuilder.create().build();  
            Object message = record.value();
            JSONObject jsonObj = new JSONObject(message.toString());
            try {
                HttpPost request = new HttpPost(this.getConsumerProperties().getProperty("api.url"));
                StringEntity params = new StringEntity(message.toString());
                request.addHeader("content-type", "application/json");
                request.addHeader("Accept", "application/json");
                request.setEntity(params);

                CloseableHttpResponse response = httpClient.execute(request);
                HttpEntity entity = response.getEntity();
                String responseString = EntityUtils.toString(entity, "UTF-8");
                System.out.println(message.toString());
                System.out.println(responseString);
            } catch(Exception ex) {
              ex.printStackTrace();
            }  finally {
                try {   
                    httpClient.close(); 
                } catch(IOException ex) {
                    ex.printStackTrace();
                } 
            }                  
        } 
    }   

假设一条消息已被使用,但 Java 类无法访问 REST API。该消息将永远不会被传递,但会被标记为已使用。处理此类情况的最佳方法是什么?当且仅当来自 REST API 的响应成功时,我能否以某种方式确认消息?

最佳答案

在消费者属性中,将 enable.auto.commit 设置为 false。 这意味着提交补偿的责任在于消费者。

因此,在上面的示例中,基于 response.statusCode,您可以选择通过调用 consumer.commitAsync() 来提交偏移量。

关于java - Apache Kafka 消息消费确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50713020/

相关文章:

javascript - java HTTP 503 错误(不是由于服务器过载)

apache-kafka - kafka组协调器如何确定它已收到所有JoinGroup请求?

java - Kafka 领导人选举导致 Kafka Streams 崩溃

java - 强制组件从顶部堆叠,而不是垂直居中

java - 将另一个类的 JTextarea 输出组合到主类框架的 JTextArea 中?

docker - 在docker容器中运行kafka

java - 当 Broker 不可用时,消息不会出现在 Spring Integration (Kafka) ErrorChannel 中

apache-kafka - Kafka 如何处理运行速度比其他消费者慢的消费者?

java - 尝试使用 java 从 Gmail 读取电子邮件时出现 MessagingException

java - 如何从 Java 中的 while 循环返回 count 的值