java - 为什么kafka流会重新处理broker重启后产生的消息

标签 java apache-kafka apache-kafka-streams

我有一个单节点 kafka 代理和简单的流应用程序。我创建了 2 个主题(主题 1 和主题 2)。

在 topic1 上生成 - 处理消息 - 写入 topic2

注意:对于生成的每条消息,仅将一条消息写入目标主题

我生成了一条消息。写入topic2后,我停止了kafka经纪人。一段时间后,我重新启动了代理并在 topic1 上生成了另一条消息。现在 Streams 应用程序处理了该消息 3 次。现在,在不停止代理的情况下,我向 topic1 生成了消息,并等待流应用程序写入 topic2,然后再次生成。

Streams 应用程序行为异常。有时对于一条生成的消息,有 2 条消息写入目标主题,有时有 3 条消息。我不明白为什么会发生这种情况。我的意思是,即使代理重新启动后生成的消息也会被重复。

更新1:

我使用的是 Kafka 版本 1.0.0 和 Kafka-Streams 版本 1.1.0

下面是代码。

Main.java

String credentials = env.get("CREDENTIALS");

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "activity-collection");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> activityStream = builder.stream("activity_contenturl");
KStream<String, String> activityResultStream = AppUtil.hitContentUrls(credentials , activityStream);
activityResultStream.to("o365_user_activity");

AppUtil.java

public static KStream<String, String> hitContentUrls(String credentials, KStream<String, String> activityStream) {

        KStream<String, String> activityResultStream = activityStream
                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {

                        ArrayList<String> log = new ArrayList<String>();
                        JSONObject received = new JSONObject(value);
                        String url = received.get("url").toString();

                        String accessToken = ServiceUtil.getAccessToken(credentials);
                        JSONObject activityLog = ServiceUtil.getActivityLogs(url, accessToken);

                        log.add(activityLog.toString());
                    }
                    return log;
                }                   
            });

                return activityResultStream;
    }

更新2:

在具有上述配置的单个代理和单个分区环境中,我启动了 Kafka 代理和流应用程序。在源主题上生成了 6 条消息,当我在目标主题上启动消费者时,有 36 条消息并且还在计数中。他们继续来。

所以我运行这个来查看消费者组:

kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

输出:

streams-collection-app-0

接下来我运行了这个:

kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app-0

输出:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                                                HOST            CLIENT-ID
o365_activity_contenturl 0          1               1               0               streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer-3a2940c2-47ab-49a0-ba72-4e49d341daee /127.0.0.1      streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer

过了一会儿,输出显示:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
o365_activity_contenturl 0          1               6               5               -               -               -

然后:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
o365_activity_contenturl 0          1               7               6               -               -               -

最佳答案

您似乎面临着已知的限制。默认情况下,Kafka 主题将消息存储至少 7 天,但已提交的偏移量存储 1 天(默认配置值 offsets.retention.mines = 1440)。因此,如果超过 1 天没有向源主题生成消息,则在应用程序重新启动后,来自主题的所有消息将再次重新处理(实际上是多次,具体取决于重新启动的次数,每个此类主题每天最多 1 次,很少有传入消息) )。

您可以找到过期提交偏移量的描述 How does an offset expire for consumer group .

在 kafka 2.0 版本中,增加了已提交偏移量的保留 KIP-186: Increase offsets retention default to 7 days .

为了防止重新处理,您可以添加消费者属性auto.offset.reset:latest(默认值为earliest)。 latest 存在一个小风险:如果当天没有人在源主题中生成消息,并且在您重新启动应用程序之后,您可能会丢失一些消息(仅在重新启动期间准确到达的消息) .

关于java - 为什么kafka流会重新处理broker重启后产生的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50741783/

相关文章:

java - 在java中将KML文件转换为geoJson

java - 使用 Stream 从 int[] 数组中查找奇数

java - 卡夫卡 : java client failed to send messages after x tries

apache-kafka - BrokerNotAvailableError : Could not find the leader Exception while Spark Streaming

java - 在tomcat的线程池中,当线程返回池时,线程本地值是否会重置或删除?

apache-kafka - Kafka代替Zookeeper进行集群管理

java - 如何使用java修改一个kafka主题的消息并将其发送到另一个kafka主题?

apache-kafka - Kafka 将单个日志事件行聚合为组合日志事件

apache-kafka - Apache Kafka 如何与多个代理和单个代理一起工作

java - Java 集合框架中接口(interface)方法的实现发生在哪里?