我有一个窗口KTable
按预期工作,但每次收到新值时它都会输出。我找到了 .suppress
运算符,它完全符合我的要求:仅在时间窗口结束时输出结果。我已向 TimeWindow
添加了 grace
值,但无法使 .suppress
与窗口化的 KTable
配合使用。 The answer to this question shows what .suppress should look like.
在我阅读 Apache's documentation 时似乎如此, untilWindowCloses
是 Suppressed 接口(interface)的一种方法,这意味着我无法实例化 Suppressed
对象,对吗?我不确定如何以这种方式实现接口(interface)(在窗口 KTable
上的 .suppress
参数中)。
我觉得我错过了一些愚蠢的东西,但我已经搜索了又搜索但无法弄清楚。有什么想法吗?
TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(10));
final KTable<Windowed<String>, GenericRecord> joinedKTable = groupedStream
.windowedBy(window)
.reduce(new Reducer<GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord aggValue, GenericRecord newValue) {
//reduce code
}
})
.suppress(Suppressed.untilWindowCloses(unbounded())); //need help here
我正在使用 Eclipse,它告诉我“方法 unbounded() 未定义。”
我做错了什么?
最佳答案
您需要静态导入 unbounded()
,或限定引用。
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
或
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
关于java - 当抑制窗口 KTable 的输出时,如何正确实现缓冲区配置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57860132/