java - Kafka 主题丢失消息

标签 java apache-kafka kafka-producer-api

尝试 ProducerRecord 中的时间戳时;我发现了一些奇怪的事情。从生产者发送几条消息后,我运行 kafka-console-consumer.sh 并验证这些消息是否在主题中。我叫住了制片人,等了一会儿。当我重新运行 kafka-console-consumer.sh 时,它没有显示我之前生成的消息。我还添加了 Producer.flush() 和 Producer.close() 但结果仍然相同。

现在,当我停止使用时间戳字段时,一切都工作正常,这让我相信带有时间戳的消息有些挑剔。

我使用的是Kafka_2.11-2.0.0(2018年7月30日发布)

以下是示例代码。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internal.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import static java.lang.Thread.sleep;
public class KafkaProducerSample{
    public static void main(String[] args){
        String kafkaHost="sample:port";
        String notificationTopic="test";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaHost);
        props.put(ProducerConfig.ACKS_CONFIG, 1);
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

        Producer<String, String> producer = new KafkaProducer(props, new StringSerialize(), new StringSerializer);

        RecordHeaders recordHeaders = new RecordHeader();
        ProducerRecord<String, String> record = new ProducerRecord(notificationTopic, null, 1574443515L, sampleKey, SampleValue);
        producer.send(record);
        sleep(1000);
    }
}

我按如下方式运行控制台消费者

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap.server KAFKA_HOST:PORT --topic test --from-beginning

#output after running producer
test


#output 5mins after shutting down producer

最佳答案

您仅异步发送一条记录,但不确认或刷新缓冲区。

您将需要发送更多记录,

producer.send(record).get();

producer.send(record);
producer.flush();

或者(首选),在主方法中执行 Runtime.addShutdownHook() 来刷新并关闭生产者

关于java - Kafka 主题丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58999519/

相关文章:

java - Unix 纪元时间(以秒为单位)转换为 Joda DateTime

java - 不同类型的 SQLite 时间戳值

java - Apache Storm : storm-kafka-monitor script throws exception

apache-kafka - 解释 Kafka 中的复制偏移检查点和恢复点偏移

java - Kafka 0.10 Java 客户端 TimeoutException : Batch containing 1 record(s) expired

java - 错误 : swap(Object[], int,int) 在集合中具有私有(private)访问权限

Java:组合多个谓词

java - 使用 slf4j 和 kafka 进行日志记录

apache-spark - Apache Spark + Delta Lake 概念

hadoop - Apache Kafka 是否将消息内部存储在 HDFS 或其他文件系统中