apache-spark - 如何获取Spark Streaming处理的记录总数?

标签 apache-spark

有谁知道 Spark 如何计算其记录数(我认为它与批处理中的事件数相同),如此处所示?

enter image description here

我正在尝试弄清楚如何远程获取此值(UI 中的流选项不存在 REST-API)。

基本上,我正在尝试执行此操作来获取我的应用程序处理的记录总数。我需要此信息用于门户网站。

我尝试计算每个阶段的记录,但它给我的数字与上图完全不同。每个阶段都包含有关其记录的信息。如图所示

enter image description here

我正在使用这个简短的 python 脚本来计算每个阶段的“inputRecords”。这是源代码:

import json, requests, urllib
print "Get stages script started!"
#URL REST-API
url = 'http://10.16.31.211:4040/api/v1/applications/app-20161104125052-0052/stages/'
response = urllib.urlopen(url)
data = json.loads(response.read())

stages = []
print len(data)
inputCounter = 0
for item in data:
        stages.append(item["stageId"])
        inputCounter += item["inputRecords"]
print "Records processed: " + str(inputCounter)

如果我理解正确的话:每个Batch有一个Job,每个Job有多个Stages,这些阶段有多个任务

因此对我来说,计算每个阶段的输入是有意义的。

最佳答案

Spark 在驱动程序上提供了一个指标端点:

<driver-host>:<ui-port>/metrics/json

Spark Streaming 应用程序将报告 UI 中可用的所有指标以及更多指标。您可能正在寻找的是:

<driver-id>.driver.<job-id>.StreamingMetrics.streaming.totalProcessedRecords: {
value: 48574640
},
<driver-id>.driver.<job-id>.StreamingMetrics.streaming.totalReceivedRecords: {
value: 48574640
}

此端点可以自定义。请参阅Spark Metrics获取信息。

关于apache-spark - 如何获取Spark Streaming处理的记录总数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40421368/

相关文章:

apache-spark - 当Kafka客户端异步发送消息时,Spark流任务正常关闭

java - Spark 作业无法在 Kubernetes 集群上启动

java - 在 Apache Spark 中跨执行器共享数据

scala - Spark - 一次通过 RDD 上的多个过滤器

java - 尽管完全不使用或引用Hadoop,但Apache Spark运行时异常 “Unable to load native-hadoop library for your platform”

scala - 来自 Dataset 的 RDD 导致 Spark 2.x 的序列化错误

scala - 快速获取数据框中的记录数

apache-spark - Spark独立安装无法连接到master

apache-spark - Spark sql 抛出 UTF8 字符串转换错误

java - 没有编码器发现嵌套 Java 类的错误