java - 如何使用kafka流处理 block /批处理数据?

标签 java scala apache-spark apache-kafka apache-kafka-streams

对于大数据中的许多情况,最好一次处理一小段记录缓冲区,而不是一次处理一条记录。

自然的例子是调用一些支持批处理的外部 API 以提高效率。

我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。

到目前为止我有:

builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")

我想要的是:

builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")

在 Scala 和 Akka Streams 中,该函数称为 groupedbatch。在 Spark Structured Streaming 中,我们可以执行 mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))

最佳答案

似乎还不存在。关注这个空间https://issues.apache.org/jira/browse/KAFKA-7432

关于java - 如何使用kafka流处理 block /批处理数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52366623/

相关文章:

python - 有没有办法将实时流从 azure blob 存储复制到 azure my sql

java - 运费计划的交付等级输出不正确

java - 这个java程序中的while循环有什么问题?

scala - Scala 中类型类的命名约定是什么?

scala - 参数数量可变的Spark Sql udf

apache-spark - 如何在pyspark中使用第一个和最后一个函数?

java - 如何检查JTable中的值是唯一的?

java - 无论 Java 中的计时器如何,Tomcat 的重新启动都会使作业每次都运行

arrays - 在 Scala 中包含 10^7 个元素的向量

scala - 使用泛型强制 Scala 特征的类型