对于大数据中的许多情况,最好一次处理一小段记录缓冲区,而不是一次处理一条记录。
自然的例子是调用一些支持批处理的外部 API 以提高效率。
我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。
到目前为止我有:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
我想要的是:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
在 Scala 和 Akka Streams 中,该函数称为 grouped
或 batch
。在 Spark Structured Streaming 中,我们可以执行 mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
。
最佳答案
似乎还不存在。关注这个空间https://issues.apache.org/jira/browse/KAFKA-7432
关于java - 如何使用kafka流处理 block /批处理数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52366623/