apache-kafka - 与 KafkaStreams 的窗口结束外连接

标签 apache-kafka outer-join apache-kafka-streams

我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。
"1-new" , "1-old" , "2-new" , "2-old" .键是唯一的,但有些可能会丢失。

现在使用 Kotlin 和 KafkaStreams API,我可以记录那些具有相同 key ID 的新旧消息。

    val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())

    val newStream = stream.filter({ key, _ -> isNew(key) })
            .map({key, value ->  KeyValue(key.replace(NEW_PREFIX, ""), value) })

    val oldStream = stream.filter({ key, _ -> isOld(key) })
            .map({key, value ->  KeyValue(key.replace(OLD_PREFIX, ""), value) })

    val joined = newStream.join(oldStream,
            { value1, value2 -> "$value1&$value2" }, windows)

    joined.foreach({ key, value ->
        log.info { "JOINED $key : $value" }
    })

现在我想知道 中缺少的新/旧 key 时间窗口因为某些原因。是否可以使用 KafkaStreams API 实现?

在我的情况下,当 key "1-old"收到和 "1-new"不是只有在这种情况下 2 分钟内我想报告 ID 1因为可疑。

最佳答案

DSL 可能不会给你你想要的。但是,您可以使用处理器 API。话虽如此,leftJoin实际上可以用来做“举重”。因此,在 leftJoin 之后您可以使用 .transform(...)带有附加状态以进一步“清理”数据。

每个old&null记录您收到,放入商店。如果您稍后收到 old&new您可以将其从商店中删除。此外,您注册一个标点符号,并且在每次标点符号调用时,您都会扫描商店中“足够旧”的条目,以便您确定以后不会old&new将产生连接结果。对于这些条目,您发出 old&null并从商店中取出。

作为替代方案,您也可以省略连接,并在单个 transform() 中完成所有操作。与状态。为此,您需要KStream#merge()新旧流与呼transform()在合并的流上。

注意:除了注册标点符号之外,您还可以将“扫描逻辑”放入转换并在每次处理记录时执行它。

关于apache-kafka - 与 KafkaStreams 的窗口结束外连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48196450/

相关文章:

java - 在 Kafka Streams 应用程序中,是否有一种方法可以使用输出主题的通配符列表来定义拓扑?

elasticsearch - Confluent 5.4.0 ElasticSearch Sink连接器中没有连接器类型错误

java - 如何将我的容器化项目连接到在本地主机上运行的 kafka?

java - 永远的卡夫卡消费者民意调查

database - 哈希全外连接如何工作?

apache-kafka - Kafka Streams - 低级处理器 API - RocksDB TimeToLive(TTL)

apache-kafka - 为什么 kafka 中的 __consumer_offsets 主题没有传播到所有经纪人?

php - 多个左连接,如何在php中输出

r - data.table 默认外部连接与 setkey,计算行(列出键的频率)

java - Kafka DSL Kstream-> Ktable Join-连接的序列化编译错误