我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。
即 "1-new"
, "1-old"
, "2-new"
, "2-old"
.键是唯一的,但有些可能会丢失。
现在使用 Kotlin 和 KafkaStreams API,我可以记录那些具有相同 key ID 的新旧消息。
val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())
val newStream = stream.filter({ key, _ -> isNew(key) })
.map({key, value -> KeyValue(key.replace(NEW_PREFIX, ""), value) })
val oldStream = stream.filter({ key, _ -> isOld(key) })
.map({key, value -> KeyValue(key.replace(OLD_PREFIX, ""), value) })
val joined = newStream.join(oldStream,
{ value1, value2 -> "$value1&$value2" }, windows)
joined.foreach({ key, value ->
log.info { "JOINED $key : $value" }
})
现在我想知道 中缺少的新/旧 key 时间窗口因为某些原因。是否可以使用 KafkaStreams API 实现?
在我的情况下,当 key
"1-old"
收到和 "1-new"
不是只有在这种情况下 2 分钟内我想报告 ID 1
因为可疑。
最佳答案
DSL 可能不会给你你想要的。但是,您可以使用处理器 API。话虽如此,leftJoin
实际上可以用来做“举重”。因此,在 leftJoin
之后您可以使用 .transform(...)
带有附加状态以进一步“清理”数据。
每个old&null
记录您收到,放入商店。如果您稍后收到 old&new
您可以将其从商店中删除。此外,您注册一个标点符号,并且在每次标点符号调用时,您都会扫描商店中“足够旧”的条目,以便您确定以后不会old&new
将产生连接结果。对于这些条目,您发出 old&null
并从商店中取出。
作为替代方案,您也可以省略连接,并在单个 transform()
中完成所有操作。与状态。为此,您需要KStream#merge()
新旧流与呼transform()
在合并的流上。
注意:除了注册标点符号之外,您还可以将“扫描逻辑”放入转换并在每次处理记录时执行它。
关于apache-kafka - 与 KafkaStreams 的窗口结束外连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48196450/