java - Spark 流 : Writing number of rows read from a Kafka topic

标签 java apache-spark apache-kafka spark-structured-streaming

Spark 流作业正在从繁忙的 kafka 主题中读取事件。为了了解每个触发间隔有多少数据进入,我只想输出从主题读取的行数。我尝试了多种方法来做到这一点,但无法弄清楚。

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", topic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
//          .option("failOnDataLoss", false)
//          .option("maxOffsetsPerTrigger", 10000)
          .load();
      stream.selectExpr("topic").agg(count("topic")).as("count");
      //stream.selectExpr("topic").groupBy("topic").agg(count(col("topic")).as("count"));
      stream.writeStream()
            .format("console")
            .option("truncate", false)
            .trigger(Trigger.ProcessingTime("10 seconds"))
            .start();

最佳答案

看来你需要

stream = stream.selectExpr("topic").agg(count("topic")).as("count");

然后你就可以打印了

关于java - Spark 流 : Writing number of rows read from a Kafka topic,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53227798/

相关文章:

java - 什么决定了 log4j TimeBasedRollingPolicy 何时翻转?

java - C# 类似属性 get/set,而不是私有(private)变量和单独的 getter/setter 方法

来自远程服务器的 MS SQL Server 2005 的 Java 数据库连接问题

scala - Spark中是否有类似twitter.scalding.addTrap的API处理异常

algorithm - 乘以集合并随机与其他集合合并 - Apache Spark

batch-file - Apache Spark : batch processing of files

apache-kafka - Kafka block.on.buffer.full 默认值

docker - 从 Docker 容器将 PySpark 连接到 Kafka

java - 按照计时器刷新 jTextField

node.js - 将消息从 AWS Lambda 推送到 Kafka