java - 当抑制窗口 KTable 的输出时,如何正确实现缓冲区配置?

标签 java apache-kafka apache-kafka-streams

我有一个窗口KTable按预期工作,但每次收到新值时它都会输出。我找到了 .suppress 运算符,它完全符合我的要求:仅在时间窗口结束时输出结果。我已向 TimeWindow 添加了 grace 值,但无法使 .suppress 与窗口化的 KTable 配合使用。 The answer to this question shows what .suppress should look like.

在我阅读 Apache's documentation 时似乎如此, untilWindowCloses 是 Suppressed 接口(interface)的一种方法,这意味着我无法实例化 Suppressed 对象,对吗?我不确定如何以这种方式实现接口(interface)(在窗口 KTable 上的 .suppress 参数中)。

我觉得我错过了一些愚蠢的东西,但我已经搜索了又搜索但无法弄清楚。有什么想法吗?

TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(10));

final KTable<Windowed<String>, GenericRecord> joinedKTable = groupedStream
    .windowedBy(window)
    .reduce(new Reducer<GenericRecord>() {
        @Override
        public GenericRecord apply(GenericRecord aggValue, GenericRecord newValue) {
            //reduce code
        }
    })
    .suppress(Suppressed.untilWindowCloses(unbounded())); //need help here

我正在使用 Eclipse,它告诉我“方法 unbounded() 未定义。”我做错了什么?

最佳答案

您需要静态导入 unbounded() ,或限定引用。

import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;

.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))

关于java - 当抑制窗口 KTable 的输出时,如何正确实现缓冲区配置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57860132/

相关文章:

java - 弱引用 get() 方法的安全性如何? (安卓,异步任务)

java - 如何将 JTable 添加到从 JScrollPane 扩展的类中?

java - Equals() 方法帮助

java - Kafka消费者集群环境偏移

apache-kafka - 如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?

java - Jersey - 如何模拟服务

apache-kafka - kafka 启动失败(版本 0.8.0 beta1 )

java - 如何根据某些值字段在运行时创建 Kafka 主题名称

java - 当 PROCESSING_GUARANTEE_CONFIG 设置为 EXACTLY_ONCE 时,Kafka 无法重新平衡

apache-kafka - 我应该用什么 : Kafka Stream or Kafka consumer api or Kafka connect