java - KStream-KTable 使用 Exactly Once 配置内部连接丢失的消息

标签 java apache-kafka apache-kafka-streams

当我不设置时加工保证这意味着流将以其默认值( at_least_once )启动,此代码可以成功记录并将加入的消息发送到相关主题。

恰好_once 配置是 启用 在同一个流应用程序上,某些数据无法成功通过 join。即使有第一个 peek 块的日志,我也看不到一些第二个 peek 日志和一些我需要的消息。

我确信 kstream 和 ktable 都需要不为空的值。并且双方都定期收到消息。

流配置:

  • processing.guarantee=exactly_once
  • replication.factor=3(这会增加内部主题的复制因子)

  • Kafka (with 3 broker) 详情:
  • 版本=2.2.0
  • log.roll.ms=3600000
  • offsets.topic.replication.factor=3
  • transaction.state.log.replication.factor=3
  • transaction.state.log.min.isr=3
  • message.max.bytes=2000024

  • 问题是,如何恰好_once 加工保证设置会导致这种情况吗?
    final KStream<String, UserProfile> userProfileStream = builder.stream(TOPIC_USER_PROFILE);
    final KTable<String, Device> deviceKTable = builder.table(TOPIC_DEVICE);
    
    userProfileStream
    .peek((genericId, userProfile) ->
        log.debug("[{}] Processing user profile: {}", openUserId, userProfile)
    )
    .join(
        deviceKTable,
        (userProfile, device) -> {
            userProfile.setDevice(device);
    
            return userProfile;
        },
        Joined.with(Serdes.String(), userProfileSerde, deviceSerde)
    )
    .peek((genericId, userProfile) ->
        log.debug("[{}] Updated user profile: {}", genericId, userProfile)
    )
    .to(TOPIC_UPDATED_USER_PROFILE, Produced.with(Serdes.String(), userProfileSerde));
    

    最佳答案

    Confluent 邮件组中也正在讨论有关该问题的更多信息:https://groups.google.com/d/msg/confluent-platform/MRjz8MRBDCg/XbVlJI0hBAAJ

    关于java - KStream-KTable 使用 Exactly Once 配置内部连接丢失的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57989278/

    相关文章:

    docker - 可以在启动docker时通过传递的env变量配置kafka连接器吗?还是 curl 是唯一的方法?

    apache-kafka - 如何将现有的 kafka 主题分区分散到更多目录中?

    java - 使用 Stream DSL 在 Kafka Streams 中指定每个流/表时间戳提取器

    java - Transformer Kafka 中的 ManagedChannel 是线程安全的吗

    java - 处理 json 解析错误而不导致 Kafka 流处理器应用程序崩溃

    java - Android发送蓝牙消息接收空格字符

    java - 尝试使用 SmbFileInputStream

    apache-kafka - 如何配置 Confluent Platform Kafka 连接日志?

    java - 在运行时提升 Java 应用程序

    java - 如何在一个数组中使用不同的类对象?